import java.time.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
+import java.util.stream.IntStream;
@Slf4j
{
for (ConsumerRecord<String, MessageTo> record : records)
{
+ nextOffset[record.partition()] = record.offset() + 1;
UUID chatRoomId = UUID.fromString(record.key());
MessageTo messageTo = record.value();
kafkaChatRoomService.persistMessage(message);
}
+
+ if (IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
+ .collect(
+ () -> Boolean.TRUE,
+ (acc, v) -> Boolean.valueOf(acc && v),
+ (a, b) -> Boolean.valueOf(a && b)))
+ {
+ log.info("Loading of messages completed! Pausing all owned partitions...");
+ consumer.pause(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> new TopicPartition(topic, shard))
+ .toList());
+ loadInProgress = false;
+ }
}
else
{