refactor: Fixed return-types of the controller
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
index 62d33f2..8f4a797 100644 (file)
@@ -1,8 +1,7 @@
-package de.juplo.kafka.chatroom.domain;
+package de.juplo.kafka.chat.backend.domain;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -10,7 +9,6 @@ import reactor.core.publisher.Sinks;
 
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.stream.Stream;
 
 
 @RequiredArgsConstructor
@@ -21,7 +19,7 @@ public class Chatroom
   private final UUID id;
   @Getter
   private final String name;
-  private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
+  private final PersistenceStrategy persistence;
   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
 
   synchronized public Mono<Message> addMessage(
@@ -30,41 +28,15 @@ public class Chatroom
       String user,
       String text)
   {
-    return persistMessage(id, timestamp, user, text)
+    return persistence
+        .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
         .doOnNext(message -> sink.tryEmitNext(message).orThrow());
   }
 
-  private Mono<Message> persistMessage(
-      Long id,
-      LocalDateTime timestamp,
-      String user,
-      String text)
-  {
-    Message message = new Message(id, (long)messages.size(), timestamp, user, text);
-
-    MessageKey key = new MessageKey(user, id);
-    Message existing = messages.get(key);
-    if (existing != null)
-    {
-      log.info("Message with key {} already exists; {}", key, existing);
-      if (!message.equals(existing))
-        throw new MessageMutationException(message, existing);
-      return Mono.empty();
-    }
-
-    messages.put(key, message);
-    return Mono
-        .fromSupplier(() -> message)
-        .log();
-  }
 
   public Mono<Message> getMessage(String username, Long messageId)
   {
-    return Mono.fromSupplier(() ->
-    {
-      MessageKey key = MessageKey.of(username, messageId);
-      return messages.get(key);
-    });
+    return persistence.getMessage(Message.MessageKey.of(username, messageId));
   }
 
   public Flux<Message> listen()
@@ -72,19 +44,13 @@ public class Chatroom
     return sink.asFlux();
   }
 
-  public Stream<Message> getMessages(long firstMessage)
+  public Flux<Message> getMessages()
   {
-    return messages
-        .values()
-        .stream()
-        .filter(message -> message.getSerialNumber() >= firstMessage);
+    return getMessages(0, Long.MAX_VALUE);
   }
 
-
-  @Value(staticConstructor = "of")
-  static class MessageKey
+  public Flux<Message> getMessages(long first, long last)
   {
-    String username;
-    Long messageId;
+    return persistence.getMessages(first, last);
   }
 }