From 3ff99ab559bf06b97c4c282a7933df951cbeaa8f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Apr 2023 14:48:22 +0200 Subject: [PATCH] NEU --- .../kafka/KafkaChatHomeService.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 912295d6..b4dccbd1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; import java.util.concurrent.ExecutorService; +import java.util.stream.IntStream; @Slf4j @@ -123,6 +124,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer { for (ConsumerRecord record : records) { + nextOffset[record.partition()] = record.offset() + 1; UUID chatRoomId = UUID.fromString(record.key()); MessageTo messageTo = record.value(); @@ -139,6 +141,24 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer kafkaChatRoomService.persistMessage(message); } + + if (IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) + .collect( + () -> Boolean.TRUE, + (acc, v) -> Boolean.valueOf(acc && v), + (a, b) -> Boolean.valueOf(a && b))) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + loadInProgress = false; + } } else { -- 2.20.1