refactor: compute `loadInProgress` on offset-change
authorKai Moritz <kai@juplo.de>
Sat, 3 Feb 2024 14:32:25 +0000 (15:32 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:28:35 +0000 (10:28 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java

index 3fe15c4..e6c18c7 100644 (file)
@@ -5,6 +5,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -38,6 +39,8 @@ public class InfoChannel implements Runnable
   private final String instanceUri;
 
   private boolean running;
+  @Getter
+  private volatile boolean loadInProgress = true;
 
 
   public InfoChannel(
@@ -68,13 +71,6 @@ public class InfoChannel implements Runnable
   }
 
 
-  boolean isLoadInProgress()
-  {
-    return IntStream
-        .range(0, numShards)
-        .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
-  }
-
   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
       UUID chatRoomId,
       String name,
@@ -189,6 +185,7 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = 0l);
+    loadInProgress = true;
 
     while (running)
     {
@@ -215,6 +212,11 @@ public class InfoChannel implements Runnable
   private void updateNextOffset(int partition, long nextOffset)
   {
     this.nextOffset[partition] = nextOffset;
+    if (loadInProgress) {
+      loadInProgress = IntStream
+          .range(0, numShards)
+          .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+    }
   }
 
   private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)