refactor: A `Message` now explicitly has a `MessageKey`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / Chatroom.java
index 115dcd6..df6794a 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.domain;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -10,7 +9,6 @@ import reactor.core.publisher.Sinks;
 
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.stream.Stream;
 
 
 @RequiredArgsConstructor
@@ -21,7 +19,7 @@ public class Chatroom
   private final UUID id;
   @Getter
   private final String name;
-  private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
+  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
 
   synchronized public Mono<Message> addMessage(
@@ -40,9 +38,9 @@ public class Chatroom
       String user,
       String text)
   {
-    Message message = new Message(id, (long)messages.size(), timestamp, user, text);
+    Message.MessageKey key = Message.MessageKey.of(user, id);
+    Message message = new Message(key, (long)messages.size(), timestamp, text);
 
-    MessageKey key = new MessageKey(user, id);
     Message existing = messages.get(key);
     if (existing != null)
     {
@@ -62,7 +60,7 @@ public class Chatroom
   {
     return Mono.fromSupplier(() ->
     {
-      MessageKey key = MessageKey.of(username, messageId);
+      Message.MessageKey key = Message.MessageKey.of(username, messageId);
       return messages.get(key);
     });
   }
@@ -72,23 +70,15 @@ public class Chatroom
     return sink.asFlux();
   }
 
-  public Stream<Message> getMessages(long first, long last)
+  public Flux<Message> getMessages(long first, long last)
   {
-    return messages
+    return Flux.fromStream(messages
         .values()
         .stream()
         .filter(message ->
         {
           long serial = message.getSerialNumber();
           return serial >= first && serial <= last;
-        });
-  }
-
-
-  @Value(staticConstructor = "of")
-  static class MessageKey
-  {
-    String username;
-    Long messageId;
+        }));
   }
 }