NEU
authorKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 12:48:22 +0000 (14:48 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 12:48:22 +0000 (14:48 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index 912295d..b4dccbd 100644 (file)
@@ -19,6 +19,7 @@ import reactor.core.publisher.Mono;
 import java.time.*;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.IntStream;
 
 
 @Slf4j
@@ -123,6 +124,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
         {
           for (ConsumerRecord<String, MessageTo> record : records)
           {
+            nextOffset[record.partition()] = record.offset() + 1;
             UUID chatRoomId = UUID.fromString(record.key());
             MessageTo messageTo = record.value();
 
@@ -139,6 +141,24 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
 
             kafkaChatRoomService.persistMessage(message);
           }
+
+          if (IntStream
+            .range(0, numShards)
+            .filter(shard -> isShardOwned[shard])
+            .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
+            .collect(
+                () -> Boolean.TRUE,
+                (acc, v) -> Boolean.valueOf(acc && v),
+                (a, b) -> Boolean.valueOf(a && b)))
+          {
+            log.info("Loading of messages completed! Pausing all owned partitions...");
+            consumer.pause(IntStream
+                .range(0, numShards)
+                .filter(shard -> isShardOwned[shard])
+                .mapToObj(shard -> new TopicPartition(topic, shard))
+                .toList());
+            loadInProgress = false;
+          }
         }
         else
         {