TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 2468af5..32a5720 100644 (file)
@@ -36,7 +36,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
   private final ZoneId zoneId;
   private final int numShards;
   private final Duration pollingInterval;
-  private final int bufferSize;
+  private final int historyLimit;
   private final Clock clock;
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
@@ -58,7 +58,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     ZoneId zoneId,
     int numShards,
     Duration pollingInterval,
-    int bufferSize,
+    int historyLimit,
     Clock clock,
     ChannelMediator channelMediator,
     ShardingPublisherStrategy shardingPublisherStrategy)
@@ -75,7 +75,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     this.zoneId = zoneId;
     this.numShards = numShards;
     this.pollingInterval = pollingInterval;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     this.clock = clock;
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
@@ -144,6 +144,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       isShardOwned[partition] =  true;
       this.currentOffset[partition] = currentOffset;
 
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.activate());
+
       log.info(
           "Partition assigned: {} - loading messages: next={} -> current={}",
           partition,
@@ -178,7 +182,13 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
+
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.deactivate());
+
       channelMediator.shardRevoked(partition);
     });
   }
@@ -324,7 +334,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
   void createChatRoomData(ChatRoomInfo chatRoomInfo)
   {
-    computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+    ChatRoomData chatRoomData = computeChatRoomData(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getShard());
+    chatRoomData.activate();
   }
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
@@ -355,9 +368,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     }
     else
     {
-      log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize);
+      log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
       KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-      chatRoomData = new ChatRoomData(clock, service, bufferSize);
+      chatRoomData = new ChatRoomData(clock, service, historyLimit);
       this.chatRoomData[shard].put(chatRoomId, chatRoomData);
     }