fix: GREEN - `ChatRoomData` obeys to the added expectations.
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
index 511b9ad..9dbeda9 100644 (file)
@@ -1,5 +1,7 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import de.juplo.kafka.chat.backend.domain.exceptions.InvalidUsernameException;
+import de.juplo.kafka.chat.backend.domain.exceptions.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -17,23 +19,23 @@ public class ChatRoomData
 {
   public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
 
-  private final ChatRoomService service;
+  private final ChatMessageService service;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
   private Sinks.Many<Message> sink;
 
 
   public ChatRoomData(
       Clock clock,
-      ChatRoomService service,
-      int bufferSize)
+      ChatMessageService service,
+      int historyLimit)
   {
-    log.info("Created ChatRoom with buffer-size {}", bufferSize);
+    log.info("Created ChatRoom with history-limit {}", historyLimit);
     this.clock = clock;
     this.service = service;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     // @RequiredArgsConstructor unfortunately not possible, because
-    // the `bufferSize` is not set, if `createSink()` is called
+    // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
     this.sink = createSink();
   }
@@ -76,7 +78,7 @@ public class ChatRoomData
   }
 
 
-  public ChatRoomService getChatRoomService()
+  public ChatMessageService getChatRoomService()
   {
     return service;
   }
@@ -108,7 +110,7 @@ public class ChatRoomData
   {
     return Sinks
         .many()
-        .multicast()
-        .onBackpressureBuffer(bufferSize);
+        .replay()
+        .limit(historyLimit);
   }
 }