WIP
authorKai Moritz <kai@juplo.de>
Mon, 23 Jan 2023 16:32:56 +0000 (17:32 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:52:12 +0000 (18:52 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index 0e18f68..cdeccec 100644 (file)
@@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -10,18 +13,43 @@ import java.util.*;
 
 
 @Slf4j
-public class KafkaChatHomeService implements ChatHomeService
+public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
 {
+  private Consumer<String, >
+  private final long[] offsets;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
-  public KafkaChatHomeService(
-    int numShards,
-    int[] ownedShards,
-    Flux<ChatRoom> chatroomFlux)
+  public KafkaChatHomeService(int numShards)
   {
-    log.debug("Creating ChatHomeService");
+    log.debug("Creating KafkaChatHomeService");
     this.chatrooms = new Map[numShards];
+    this.offsets = new long[numShards];
+    for (int i=0; i< numShards; i++)
+      this.offsets[i] = 0l;
+  }
+
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    log.info("Assigned partitions: {}", partitions);
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    log.info("Revoked partitions: {}", partitions);
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    log.info("Revoked partitions: {}", partitions);
+  }
+
+  private void foo()
+  {
     Set<Integer> owned = Arrays
       .stream(ownedShards)
       .collect(