TMP:test:FIX
authorKai Moritz <kai@juplo.de>
Tue, 12 Mar 2024 08:54:00 +0000 (09:54 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 16 Mar 2024 09:45:41 +0000 (10:45 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index ca33aca..20c046d 100644 (file)
@@ -24,7 +24,7 @@ public class ChatRoomData
   private final Clock clock;
   private final int historyLimit;
   private Sinks.Many<Message> sink;
-  private volatile boolean active = true;
+  private volatile boolean active = false;
 
 
   public ChatRoomData(
@@ -114,6 +114,12 @@ public class ChatRoomData
 
   public void activate()
   {
+    if (active)
+    {
+      log.info("{} is already active!", service.getChatRoomId());
+      return;
+    }
+
     log.info("{} is being activated", service.getChatRoomId());
     this.sink = createSink();
     active = true;
index 32a5720..ae544e4 100644 (file)
@@ -144,10 +144,6 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       isShardOwned[partition] =  true;
       this.currentOffset[partition] = currentOffset;
 
-      chatRoomData[partition]
-          .values()
-          .forEach(chatRoomData -> chatRoomData.activate());
-
       log.info(
           "Partition assigned: {} - loading messages: next={} -> current={}",
           partition,
@@ -223,6 +219,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
             {
               log.info("Loading of messages completed! Pausing all owned partitions...");
               pauseAllOwnedPartions();
+              activateAllOwnedChatRooms();
               log.info("Resuming normal operations...");
               channelState = ChannelState.READY;
             }
@@ -323,6 +320,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
         .toList());
   }
 
+  private void activateAllOwnedChatRooms()
+  {
+    IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .forEach(shard -> chatRoomData[shard]
+            .values()
+            .forEach(chatRoomData -> chatRoomData.activate()));
+  }
+
 
   int[] getOwnedShards()
   {