From: Kai Moritz Date: Sat, 15 Apr 2023 12:48:22 +0000 (+0200) Subject: NEU X-Git-Tag: kafkadata~22 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3ff99ab559bf06b97c4c282a7933df951cbeaa8f;p=demos%2Fkafka%2Fchat NEU --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 912295d6..b4dccbd1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; import java.util.concurrent.ExecutorService; +import java.util.stream.IntStream; @Slf4j @@ -123,6 +124,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer { for (ConsumerRecord record : records) { + nextOffset[record.partition()] = record.offset() + 1; UUID chatRoomId = UUID.fromString(record.key()); MessageTo messageTo = record.value(); @@ -139,6 +141,24 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer kafkaChatRoomService.persistMessage(message); } + + if (IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) + .collect( + () -> Boolean.TRUE, + (acc, v) -> Boolean.valueOf(acc && v), + (a, b) -> Boolean.valueOf(a && b))) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + loadInProgress = false; + } } else {