WIP
authorKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 14:48:25 +0000 (15:48 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 14:48:25 +0000 (15:48 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index 2e3b42f..b2ed284 100644 (file)
@@ -18,7 +18,6 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   private final Consumer<String, MessageTo> consumer;
   private final String topic;
   private final long[] offsets;
-  private final MessageHandler[] handlers;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
@@ -31,13 +30,14 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     this.consumer = consumer;
     this.topic = topic;
     this.offsets = new long[numShards];
-    this.handlers = new MessageHandler[numShards];
+    this.chatrooms = new Map<>[numShards];
     for (int i=0; i< numShards; i++)
     {
       this.offsets[i] = 0l;
-      this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i));
+      this.chatrooms[i] = chat
     }
     this.chatrooms = new Map[numShards];
+    this.chatrooms = new Map[numShards];
   }
 
 
@@ -55,7 +55,11 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       int partition = tp.partition();
       long unseenOffset = offsets[partition];
 
-      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+      log.info(
+          "Loading messages from partition {}: start-offset={} -> current-offset={}",
+          partition,
+          unseenOffset,
+          currentOffset);
       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
     });
   }