From ce901580f30aad469ab18fcfda4920cf72030a84 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 23 Jan 2023 18:00:14 +0100 Subject: [PATCH] WIP --- .../kafka/KafkaChatHomeService.java | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 42ac9430..b344a682 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -15,25 +15,43 @@ import java.util.*; @Slf4j public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener { - private Consumer + private final Consumer consumer; + private final String topic; private final long[] offsets; private final Map[] chatrooms; - public KafkaChatHomeService(int numShards) + public KafkaChatHomeService( + Consumer consumer, + String topic, + int numShards) { log.debug("Creating KafkaChatHomeService"); - this.chatrooms = new Map[numShards]; + this.consumer = consumer; + this.topic = topic; this.offsets = new long[numShards]; for (int i=0; i< numShards; i++) this.offsets[i] = 0l; + this.chatrooms = new Map[numShards]; } @Override public void onPartitionsAssigned(Collection partitions) { - log.info("Assigned partitions: {}", partitions); + consumer.endOffsets(partitions).forEach((tp, currentOffset) -> + { + if (!tp.topic().equals(topic)) + { + log.warn("Ignoring unwanted TopicPartition", tp); + return; + } + + int partition = tp.partition(); + long unseenOffset = offsets[partition]; + + log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset); + }); } @Override -- 2.20.1