refactor: Moved persistence-logic into a pluggable strategy
authorKai Moritz <kai@juplo.de>
Wed, 28 Dec 2022 16:53:46 +0000 (17:53 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 28 Dec 2022 17:03:53 +0000 (18:03 +0100)
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java
src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java [new file with mode: 0644]

index 437ff1f..17d2b1f 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.api;
 
 import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
@@ -20,13 +21,14 @@ import java.util.UUID;
 public class ChatBackendController
 {
   private final Map<UUID, Chatroom> chatrooms = new HashMap<>();
+  private final PersistenceStrategy persistenceStrategy;
   private final Clock clock;
 
 
   @PostMapping("create")
   public Chatroom create(@RequestBody String name)
   {
-    Chatroom chatroom = new Chatroom(UUID.randomUUID(), name);
+    Chatroom chatroom = new Chatroom(UUID.randomUUID(), name, persistenceStrategy);
     chatrooms.put(chatroom.getId(), chatroom);
     return chatroom;
   }
index df6794a..c05fda0 100644 (file)
@@ -19,7 +19,7 @@ public class Chatroom
   private final UUID id;
   @Getter
   private final String name;
-  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+  private final PersistenceStrategy persistence;
   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
 
   synchronized public Mono<Message> addMessage(
@@ -28,41 +28,15 @@ public class Chatroom
       String user,
       String text)
   {
-    return persistMessage(id, timestamp, user, text)
+    return persistence
+        .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
   }
 
-  private Mono<Message> persistMessage(
-      Long id,
-      LocalDateTime timestamp,
-      String user,
-      String text)
-  {
-    Message.MessageKey key = Message.MessageKey.of(user, id);
-    Message message = new Message(key, (long)messages.size(), timestamp, text);
-
-    Message existing = messages.get(key);
-    if (existing != null)
-    {
-      log.info("Message with key {} already exists; {}", key, existing);
-      if (!message.equals(existing))
-        throw new MessageMutationException(message, existing);
-      return Mono.empty();
-    }
-
-    messages.put(key, message);
-    return Mono
-        .fromSupplier(() -> message)
-        .log();
-  }
 
   public Mono<Message> getMessage(String username, Long messageId)
   {
-    return Mono.fromSupplier(() ->
-    {
-      Message.MessageKey key = Message.MessageKey.of(username, messageId);
-      return messages.get(key);
-    });
+    return persistence.getMessage(Message.MessageKey.of(username, messageId));
   }
 
   public Flux<Message> listen()
@@ -72,13 +46,6 @@ public class Chatroom
 
   public Flux<Message> getMessages(long first, long last)
   {
-    return Flux.fromStream(messages
-        .values()
-        .stream()
-        .filter(message ->
-        {
-          long serial = message.getSerialNumber();
-          return serial >= first && serial <= last;
-        }));
+    return persistence.getMessages(first, last);
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java
new file mode 100644 (file)
index 0000000..452a62d
--- /dev/null
@@ -0,0 +1,20 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.Value;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+public interface PersistenceStrategy
+{
+  Mono<Message> persistMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text);
+
+  Mono<Message> getMessage(Message.MessageKey key);
+
+  Flux<Message> getMessages(long first, long last);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java
new file mode 100644 (file)
index 0000000..f6c76fa
--- /dev/null
@@ -0,0 +1,69 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class InMemoryPersistenceStrategy implements PersistenceStrategy
+{
+  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+  @Override
+  public Mono<Message> persistMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    Message message = new Message(key, (long)messages.size(), timestamp, text);
+
+    Message existing = messages.get(key);
+    if (existing != null)
+    {
+      log.info("Message with key {} already exists; {}", key, existing);
+      if (!message.equals(existing))
+        throw new MessageMutationException(message, existing);
+      return Mono.empty();
+    }
+
+    messages.put(key, message);
+    return Mono
+        .fromSupplier(() -> message)
+        .log();
+  }
+
+  @Override
+  public Mono<Message> getMessage(Message.MessageKey key)
+  {
+    return Mono.fromSupplier(() -> messages.get(key));
+  }
+
+  @Override
+  public Flux<Message> getMessages(long first, long last)
+  {
+    return Flux.fromStream(messages
+        .values()
+        .stream()
+        .filter(message ->
+        {
+          long serial = message.getSerialNumber();
+          return serial >= first && serial <= last;
+        }));
+  }
+}