TMP:test:FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 32a5720..ae544e4 100644 (file)
@@ -144,10 +144,6 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       isShardOwned[partition] =  true;
       this.currentOffset[partition] = currentOffset;
 
-      chatRoomData[partition]
-          .values()
-          .forEach(chatRoomData -> chatRoomData.activate());
-
       log.info(
           "Partition assigned: {} - loading messages: next={} -> current={}",
           partition,
@@ -223,6 +219,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
             {
               log.info("Loading of messages completed! Pausing all owned partitions...");
               pauseAllOwnedPartions();
+              activateAllOwnedChatRooms();
               log.info("Resuming normal operations...");
               channelState = ChannelState.READY;
             }
@@ -323,6 +320,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
         .toList());
   }
 
+  private void activateAllOwnedChatRooms()
+  {
+    IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .forEach(shard -> chatRoomData[shard]
+            .values()
+            .forEach(chatRoomData -> chatRoomData.activate()));
+  }
+
 
   int[] getOwnedShards()
   {