WIP
authorKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 18:21:47 +0000 (19:21 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 18:21:47 +0000 (19:21 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index e171bc5..7ebf049 100644 (file)
@@ -5,10 +5,12 @@ import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.ZoneId;
 import java.util.*;
 
 
@@ -16,26 +18,32 @@ import java.util.*;
 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
 {
   private final Consumer<String, MessageTo> consumer;
+  private final Producer<String, MessageTo> producer;
   private final String topic;
+  private final ZoneId zoneId;
   // private final long[] offsets; Erst mal immer alles neu einlesen
-  private final Map<UUID, KafkaChatRoomService>[] kafkaChatRoomServiceMaps;
+  private final ChatHomeLoader[] chatHomeLoaders;
   private final Map<UUID, ChatRoom>[] chatRoomMaps;
 
 
   public KafkaChatHomeService(
     Consumer<String, MessageTo> consumer,
+    Producer<String, MessageTo> producer,
     String topic,
+    ZoneId zoneId,
     int numShards)
   {
     log.debug("Creating KafkaChatHomeService");
     this.consumer = consumer;
+    this.producer = producer;
     this.topic = topic;
+    this.zoneId = zoneId;
     // this.offsets = new long[numShards];
     // for (int i=0; i< numShards; i++)
     // {
     //   this.offsets[i] = 0l;
     // }
-    this.kafkaChatRoomServiceMaps = new Map[numShards];
+    this.chatHomeLoaders = new ChatHomeLoader[numShards];
     this.chatRoomMaps = new Map[numShards];
   }
 
@@ -43,16 +51,15 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
+    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
     {
-      if (!tp.topic().equals(topic))
+      if (!topicPartition.topic().equals(topic))
       {
-        log.warn("Ignoring partition from unwanted topic: {}", tp);
+        log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
         return;
       }
 
-      int partition = tp.partition();
-      kafkaChatRoomServiceMaps[partition] = new HashMap<>(); // TODO: reuse! Nicht immer alles neu laden
+      int partition = topicPartition.partition();
       long unseenOffset = 0; // offsets[partition];
 
       log.info(
@@ -61,11 +68,12 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
           unseenOffset,
           currentOffset);
 
-      consumer.seek(tp, unseenOffset);
-      chatRoomMaps[partition]
-          .values()
-          .stream()
-      handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
+      // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
+      consumer.seek(topicPartition, unseenOffset);
+      chatHomeLoaders[partition] = new ChatHomeLoader(
+          producer,
+          currentOffset,
+          zoneId);
     });
   }