TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 8dcc1bc..32a5720 100644 (file)
@@ -144,6 +144,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       isShardOwned[partition] =  true;
       this.currentOffset[partition] = currentOffset;
 
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.activate());
+
       log.info(
           "Partition assigned: {} - loading messages: next={} -> current={}",
           partition,
@@ -176,12 +180,15 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     partitions.forEach(topicPartition ->
     {
       int partition = topicPartition.partition();
-      chatRoomData[partition]
-          .values()
-          .forEach(chatRoomData -> chatRoomData.close());
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
+
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.deactivate());
+
       channelMediator.shardRevoked(partition);
     });
   }
@@ -327,7 +334,10 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
   void createChatRoomData(ChatRoomInfo chatRoomInfo)
   {
-    computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+    ChatRoomData chatRoomData = computeChatRoomData(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getShard());
+    chatRoomData.activate();
   }
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)