From 465e722c9cd504932bc544c0f66e0b54007bdcdf Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sat, 9 Mar 2024 11:17:33 +0100
Subject: [PATCH] fix: GREEN - `ChatRoomData` obeys to the added expectations.

* Switched `ChatRoomData` from a multicast- to a replay-sink.
* Before, listening was implemented with a multicast-sink, that enabled
  back-pressure.
* Now, it was refactored to use a replay-sink, that enables a (configurable)
  limitted replay.
---
 .../chat/backend/ChatBackendProperties.java      |  2 +-
 .../kafka/chat/backend/domain/ChatRoomData.java  | 14 +++++++-------
 .../inmemory/InMemoryServicesConfiguration.java  |  4 ++--
 .../inmemory/SimpleChatHomeService.java          | 16 ++++++++--------
 .../implementation/kafka/DataChannel.java        | 10 +++++-----
 .../kafka/KafkaServicesConfiguration.java        |  2 +-
 6 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
index c3609a39..ba794523 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
@@ -17,7 +17,7 @@ public class ChatBackendProperties
 {
   private String instanceId = "DEV";
   private String allowedOrigins = "http://localhost:4200";
-  private int chatroomBufferSize = 1024;
+  private int chatroomHistoryLimit = 100;
   private ServiceType services = ServiceType.inmemory;
   private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
   private KafkaServicesProperties kafka = new KafkaServicesProperties();
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
index d783eb4c..9dbeda9e 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
@@ -21,21 +21,21 @@ public class ChatRoomData
 
   private final ChatMessageService service;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
   private Sinks.Many<Message> sink;
 
 
   public ChatRoomData(
       Clock clock,
       ChatMessageService service,
-      int bufferSize)
+      int historyLimit)
   {
-    log.info("Created ChatRoom with buffer-size {}", bufferSize);
+    log.info("Created ChatRoom with history-limit {}", historyLimit);
     this.clock = clock;
     this.service = service;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     // @RequiredArgsConstructor unfortunately not possible, because
-    // the `bufferSize` is not set, if `createSink()` is called
+    // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
     this.sink = createSink();
   }
@@ -110,7 +110,7 @@ public class ChatRoomData
   {
     return Sinks
         .many()
-        .multicast()
-        .onBackpressureBuffer(bufferSize);
+        .replay()
+        .limit(historyLimit);
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
index 3f3d8884..43973ae0 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
@@ -33,7 +33,7 @@ public class InMemoryServicesConfiguration
   {
     SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
         clock,
-        properties.getChatroomBufferSize());
+        properties.getChatroomHistoryLimit());
     chatHomeService.restore(storageStrategy).block();
     return chatHomeService;
   }
@@ -57,7 +57,7 @@ public class InMemoryServicesConfiguration
           SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
               shard,
               clock,
-              properties.getChatroomBufferSize());
+              properties.getChatroomHistoryLimit());
           service.restore(storageStrategy).block();
         });
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
index d568a9b4..2aac0fac 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
@@ -18,24 +18,24 @@ public class SimpleChatHomeService implements ChatHomeService
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final Map<UUID, ChatRoomData> chatRoomData;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
 
 
 
   public SimpleChatHomeService(
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
     this(
         null,
         clock,
-        bufferSize);
+        historyLimit);
   }
 
   public SimpleChatHomeService(
       Integer shard,
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
     log.debug("Creating SimpleChatHomeService");
 
@@ -43,7 +43,7 @@ public class SimpleChatHomeService implements ChatHomeService
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
     this.clock = clock;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
   }
 
 
@@ -81,7 +81,7 @@ public class SimpleChatHomeService implements ChatHomeService
               new ChatRoomData(
                   clock,
                   chatMessageService,
-                  bufferSize));
+                  historyLimit));
 
           return chatMessageService.restore(storageStrategy);
         })
@@ -95,11 +95,11 @@ public class SimpleChatHomeService implements ChatHomeService
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    log.info("Creating ChatRoom with history-limit {}", historyLimit);
     ChatMessageService service = new InMemoryChatMessageService(id);
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
     this.chatRoomData.put(id, chatRoomData);
     return Mono.just(chatRoomInfo);
   }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
index 2468af5a..63f36f53 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
@@ -36,7 +36,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
   private final ZoneId zoneId;
   private final int numShards;
   private final Duration pollingInterval;
-  private final int bufferSize;
+  private final int historyLimit;
   private final Clock clock;
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
@@ -58,7 +58,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     ZoneId zoneId,
     int numShards,
     Duration pollingInterval,
-    int bufferSize,
+    int historyLimit,
     Clock clock,
     ChannelMediator channelMediator,
     ShardingPublisherStrategy shardingPublisherStrategy)
@@ -75,7 +75,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     this.zoneId = zoneId;
     this.numShards = numShards;
     this.pollingInterval = pollingInterval;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     this.clock = clock;
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
@@ -355,9 +355,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     }
     else
     {
-      log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize);
+      log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
       KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-      chatRoomData = new ChatRoomData(clock, service, bufferSize);
+      chatRoomData = new ChatRoomData(clock, service, historyLimit);
       this.chatRoomData[shard].put(chatRoomId, chatRoomData);
     }
 
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
index bebed14b..58e11173 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
@@ -150,7 +150,7 @@ public class KafkaServicesConfiguration
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getKafka().getPollingInterval(),
-        properties.getChatroomBufferSize(),
+        properties.getChatroomHistoryLimit(),
         clock,
         channelMediator,
         shardingPublisherStrategy);
-- 
2.20.1