WAS:TMP:IS?FIX:WIP:test: `*ConfigurationIT` asserts, if restored messages can be...
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ChatRoomData.java
index d783eb4..bff56c1 100644 (file)
@@ -21,21 +21,21 @@ public class ChatRoomData
 
   private final ChatMessageService service;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
   private Sinks.Many<Message> sink;
 
 
   public ChatRoomData(
       Clock clock,
       ChatMessageService service,
-      int bufferSize)
+      int historyLimit)
   {
-    log.info("Created ChatRoom with buffer-size {}", bufferSize);
+    log.info("Created ChatRoom with history-limit {}", historyLimit);
     this.clock = clock;
     this.service = service;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     // @RequiredArgsConstructor unfortunately not possible, because
-    // the `bufferSize` is not set, if `createSink()` is called
+    // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
     this.sink = createSink();
   }
@@ -106,11 +106,17 @@ public class ChatRoomData
     return service.getMessages(first, last);
   }
 
+  public void close()
+  {
+    log.info("{} is being closed", service.getChatRoomId());
+    sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+  }
+
   private Sinks.Many<Message> createSink()
   {
     return Sinks
         .many()
-        .multicast()
-        .onBackpressureBuffer(bufferSize);
+        .replay()
+        .limit(historyLimit);
   }
 }