TMP:test:FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 2468af5..ae544e4 100644 (file)
@@ -36,7 +36,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
   private final ZoneId zoneId;
   private final int numShards;
   private final Duration pollingInterval;
-  private final int bufferSize;
+  private final int historyLimit;
   private final Clock clock;
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
@@ -58,7 +58,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     ZoneId zoneId,
     int numShards,
     Duration pollingInterval,
-    int bufferSize,
+    int historyLimit,
     Clock clock,
     ChannelMediator channelMediator,
     ShardingPublisherStrategy shardingPublisherStrategy)
@@ -75,7 +75,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     this.zoneId = zoneId;
     this.numShards = numShards;
     this.pollingInterval = pollingInterval;
-    this.bufferSize = bufferSize;
+    this.historyLimit = historyLimit;
     this.clock = clock;
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
@@ -178,7 +178,13 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
+
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.deactivate());
+
       channelMediator.shardRevoked(partition);
     });
   }
@@ -213,6 +219,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
             {
               log.info("Loading of messages completed! Pausing all owned partitions...");
               pauseAllOwnedPartions();
+              activateAllOwnedChatRooms();
               log.info("Resuming normal operations...");
               channelState = ChannelState.READY;
             }
@@ -313,6 +320,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
         .toList());
   }
 
+  private void activateAllOwnedChatRooms()
+  {
+    IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .forEach(shard -> chatRoomData[shard]
+            .values()
+            .forEach(chatRoomData -> chatRoomData.activate()));
+  }
+
 
   int[] getOwnedShards()
   {
@@ -324,7 +341,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
   void createChatRoomData(ChatRoomInfo chatRoomInfo)
   {
-    computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+    ChatRoomData chatRoomData = computeChatRoomData(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getShard());
+    chatRoomData.activate();
   }
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
@@ -355,9 +375,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     }
     else
     {
-      log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize);
+      log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
       KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-      chatRoomData = new ChatRoomData(clock, service, bufferSize);
+      chatRoomData = new ChatRoomData(clock, service, historyLimit);
       this.chatRoomData[shard].put(chatRoomId, chatRoomData);
     }