fix: GREEN - `ChatRoomData` obeys to the added expectations.
authorKai Moritz <kai@juplo.de>
Sat, 9 Mar 2024 10:17:33 +0000 (11:17 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 16 Mar 2024 09:10:54 +0000 (10:10 +0100)
* Switched `ChatRoomData` from a multicast- to a replay-sink.
* Before, listening was implemented with a multicast-sink, that enabled
  back-pressure.
* Now, it was refactored to use a replay-sink, that enables a (configurable)
  limitted replay.

src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index c3609a3..ba79452 100644 (file)
@@ -17,7 +17,7 @@ public class ChatBackendProperties
 {
   private String instanceId = "DEV";
   private String allowedOrigins = "http://localhost:4200";
-  private int chatroomBufferSize = 1024;
+  private int chatroomHistoryLimit = 100;
   private ServiceType services = ServiceType.inmemory;
   private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
   private KafkaServicesProperties kafka = new KafkaServicesProperties();
index d783eb4..9dbeda9 100644 (file)
@@ -21,21 +21,21 @@ public class ChatRoomData
 
   private final ChatMessageService service;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
   private Sinks.Many<Message> sink;
 
 
   public ChatRoomData(
       Clock clock,
       ChatMessageService service,
-      int bufferSize)
+      int historyLimit)
   {
-    log.info("Created ChatRoom with buffer-size {}", bufferSize);
+    log.info("Created ChatRoom with history-limit {}", historyLimit);
     this.clock = clock;
     this.service = service;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     // @RequiredArgsConstructor unfortunately not possible, because
-    // the `bufferSize` is not set, if `createSink()` is called
+    // the `historyLimit` is not set, if `createSink()` is called
     // from the variable declaration!
     this.sink = createSink();
   }
@@ -110,7 +110,7 @@ public class ChatRoomData
   {
     return Sinks
         .many()
-        .multicast()
-        .onBackpressureBuffer(bufferSize);
+        .replay()
+        .limit(historyLimit);
   }
 }
index 3f3d888..43973ae 100644 (file)
@@ -33,7 +33,7 @@ public class InMemoryServicesConfiguration
   {
     SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
         clock,
-        properties.getChatroomBufferSize());
+        properties.getChatroomHistoryLimit());
     chatHomeService.restore(storageStrategy).block();
     return chatHomeService;
   }
@@ -57,7 +57,7 @@ public class InMemoryServicesConfiguration
           SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
               shard,
               clock,
-              properties.getChatroomBufferSize());
+              properties.getChatroomHistoryLimit());
           service.restore(storageStrategy).block();
         });
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
index d568a9b..2aac0fa 100644 (file)
@@ -18,24 +18,24 @@ public class SimpleChatHomeService implements ChatHomeService
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final Map<UUID, ChatRoomData> chatRoomData;
   private final Clock clock;
-  private final int bufferSize;
+  private final int historyLimit;
 
 
 
   public SimpleChatHomeService(
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
     this(
         null,
         clock,
-        bufferSize);
+        historyLimit);
   }
 
   public SimpleChatHomeService(
       Integer shard,
       Clock clock,
-      int bufferSize)
+      int historyLimit)
   {
     log.debug("Creating SimpleChatHomeService");
 
@@ -43,7 +43,7 @@ public class SimpleChatHomeService implements ChatHomeService
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
     this.clock = clock;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
   }
 
 
@@ -81,7 +81,7 @@ public class SimpleChatHomeService implements ChatHomeService
               new ChatRoomData(
                   clock,
                   chatMessageService,
-                  bufferSize));
+                  historyLimit));
 
           return chatMessageService.restore(storageStrategy);
         })
@@ -95,11 +95,11 @@ public class SimpleChatHomeService implements ChatHomeService
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    log.info("Creating ChatRoom with history-limit {}", historyLimit);
     ChatMessageService service = new InMemoryChatMessageService(id);
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
     this.chatRoomInfo.put(id, chatRoomInfo);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
     this.chatRoomData.put(id, chatRoomData);
     return Mono.just(chatRoomInfo);
   }
index 2468af5..63f36f5 100644 (file)
@@ -36,7 +36,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
   private final ZoneId zoneId;
   private final int numShards;
   private final Duration pollingInterval;
-  private final int bufferSize;
+  private final int historyLimit;
   private final Clock clock;
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
@@ -58,7 +58,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     ZoneId zoneId,
     int numShards,
     Duration pollingInterval,
-    int bufferSize,
+    int historyLimit,
     Clock clock,
     ChannelMediator channelMediator,
     ShardingPublisherStrategy shardingPublisherStrategy)
@@ -75,7 +75,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     this.zoneId = zoneId;
     this.numShards = numShards;
     this.pollingInterval = pollingInterval;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     this.clock = clock;
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
@@ -355,9 +355,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     }
     else
     {
-      log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize);
+      log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
       KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-      chatRoomData = new ChatRoomData(clock, service, bufferSize);
+      chatRoomData = new ChatRoomData(clock, service, historyLimit);
       this.chatRoomData[shard].put(chatRoomId, chatRoomData);
     }
 
index bebed14..58e1117 100644 (file)
@@ -150,7 +150,7 @@ public class KafkaServicesConfiguration
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getKafka().getPollingInterval(),
-        properties.getChatroomBufferSize(),
+        properties.getChatroomHistoryLimit(),
         clock,
         channelMediator,
         shardingPublisherStrategy);