feat: `ChatHome` returns an `Optional` for `getChatroom(UUID)`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
index df6794a..c05fda0 100644 (file)
@@ -19,7 +19,7 @@ public class Chatroom
   private final UUID id;
   @Getter
   private final String name;
-  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+  private final PersistenceStrategy persistence;
   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
 
   synchronized public Mono<Message> addMessage(
@@ -28,41 +28,15 @@ public class Chatroom
       String user,
       String text)
   {
-    return persistMessage(id, timestamp, user, text)
+    return persistence
+        .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
   }
 
-  private Mono<Message> persistMessage(
-      Long id,
-      LocalDateTime timestamp,
-      String user,
-      String text)
-  {
-    Message.MessageKey key = Message.MessageKey.of(user, id);
-    Message message = new Message(key, (long)messages.size(), timestamp, text);
-
-    Message existing = messages.get(key);
-    if (existing != null)
-    {
-      log.info("Message with key {} already exists; {}", key, existing);
-      if (!message.equals(existing))
-        throw new MessageMutationException(message, existing);
-      return Mono.empty();
-    }
-
-    messages.put(key, message);
-    return Mono
-        .fromSupplier(() -> message)
-        .log();
-  }
 
   public Mono<Message> getMessage(String username, Long messageId)
   {
-    return Mono.fromSupplier(() ->
-    {
-      Message.MessageKey key = Message.MessageKey.of(username, messageId);
-      return messages.get(key);
-    });
+    return persistence.getMessage(Message.MessageKey.of(username, messageId));
   }
 
   public Flux<Message> listen()
@@ -72,13 +46,6 @@ public class Chatroom
 
   public Flux<Message> getMessages(long first, long last)
   {
-    return Flux.fromStream(messages
-        .values()
-        .stream()
-        .filter(message ->
-        {
-          long serial = message.getSerialNumber();
-          return serial >= first && serial <= last;
-        }));
+    return persistence.getMessages(first, last);
   }
 }