From: Kai Moritz Date: Mon, 23 Jan 2023 17:00:14 +0000 (+0100) Subject: WIP X-Git-Tag: wip~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ce901580f30aad469ab18fcfda4920cf72030a84;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 42ac9430..b344a682 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -15,25 +15,43 @@ import java.util.*; @Slf4j public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener { - private Consumer + private final Consumer consumer; + private final String topic; private final long[] offsets; private final Map[] chatrooms; - public KafkaChatHomeService(int numShards) + public KafkaChatHomeService( + Consumer consumer, + String topic, + int numShards) { log.debug("Creating KafkaChatHomeService"); - this.chatrooms = new Map[numShards]; + this.consumer = consumer; + this.topic = topic; this.offsets = new long[numShards]; for (int i=0; i< numShards; i++) this.offsets[i] = 0l; + this.chatrooms = new Map[numShards]; } @Override public void onPartitionsAssigned(Collection partitions) { - log.info("Assigned partitions: {}", partitions); + consumer.endOffsets(partitions).forEach((tp, currentOffset) -> + { + if (!tp.topic().equals(topic)) + { + log.warn("Ignoring unwanted TopicPartition", tp); + return; + } + + int partition = tp.partition(); + long unseenOffset = offsets[partition]; + + log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset); + }); } @Override