WIP
authorKai Moritz <kai@juplo.de>
Mon, 23 Jan 2023 17:00:14 +0000 (18:00 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:52:12 +0000 (18:52 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index 42ac943..b344a68 100644 (file)
@@ -15,25 +15,43 @@ import java.util.*;
 @Slf4j
 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
 {
-  private Consumer<String, MessageTo>
+  private final Consumer<String, MessageTo> consumer;
+  private final String topic;
   private final long[] offsets;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
-  public KafkaChatHomeService(int numShards)
+  public KafkaChatHomeService(
+    Consumer<String, MessageTo> consumer,
+    String topic,
+    int numShards)
   {
     log.debug("Creating KafkaChatHomeService");
-    this.chatrooms = new Map[numShards];
+    this.consumer = consumer;
+    this.topic = topic;
     this.offsets = new long[numShards];
     for (int i=0; i< numShards; i++)
       this.offsets[i] = 0l;
+    this.chatrooms = new Map[numShards];
   }
 
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    log.info("Assigned partitions: {}", partitions);
+    consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
+    {
+      if (!tp.topic().equals(topic))
+      {
+        log.warn("Ignoring unwanted TopicPartition", tp);
+        return;
+      }
+
+      int partition = tp.partition();
+      long unseenOffset = offsets[partition];
+
+      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+    });
   }
 
   @Override