WIP:fix:activation
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index ae544e4..abe51f4 100644 (file)
@@ -341,10 +341,17 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
 
   void createChatRoomData(ChatRoomInfo chatRoomInfo)
   {
+    int shard = chatRoomInfo.getShard();
+
     ChatRoomData chatRoomData = computeChatRoomData(
         chatRoomInfo.getId(),
         chatRoomInfo.getShard());
-    chatRoomData.activate();
+
+    // TODO: Possible race-condition in case of an ongoing rebalance!
+    if (isShardOwned[shard])
+    {
+      chatRoomData.activate();
+    }
   }
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)