TMP:test:FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 63f36f5..ae544e4 100644 (file)
@@ -178,7 +178,13 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
+
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.deactivate());
+
       channelMediator.shardRevoked(partition);
     });
   }
@@ -213,6 +219,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
             {
               log.info("Loading of messages completed! Pausing all owned partitions...");
               pauseAllOwnedPartions();
+              activateAllOwnedChatRooms();
               log.info("Resuming normal operations...");
               channelState = ChannelState.READY;
             }
@@ -313,6 +320,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
         .toList());
   }
 
+  private void activateAllOwnedChatRooms()
+  {
+    IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .forEach(shard -> chatRoomData[shard]
+            .values()
+            .forEach(chatRoomData -> chatRoomData.activate()));
+  }
+
 
   int[] getOwnedShards()
   {
@@ -324,7 +341,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
   void createChatRoomData(ChatRoomInfo chatRoomInfo)
   {
-    computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+    ChatRoomData chatRoomData = computeChatRoomData(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getShard());
+    chatRoomData.activate();
   }
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)