WIP
authorKai Moritz <kai@juplo.de>
Fri, 17 Feb 2023 18:07:11 +0000 (19:07 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 11:14:59 +0000 (12:14 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index 4fa567c..556a226 100644 (file)
@@ -37,7 +37,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     for (int i=0; i< numShards; i++)
     {
       this.offsets[i] = 0l;
-      this.handlers[i] = new NoOpMessageHandler(i);
+      this.handlers[i] = new MessageHandler(new TopicPartition(topic, i));
     }
     this.chatrooms = new Map[numShards];
   }
@@ -50,7 +50,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     {
       if (!tp.topic().equals(topic))
       {
-        log.warn("Ignoring unwanted TopicPartition", tp);
+        log.warn("Ignoring partition from unwanted topic: {}", tp);
         return;
       }
 
@@ -58,13 +58,26 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       long unseenOffset = offsets[partition];
 
       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
-      handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
+      handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
     });
   }
 
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    partitions.forEach(tp ->
+    {
+      if (!tp.topic().equals(topic))
+      {
+        log.warn("Ignoring partition from unwanted topic: {}", tp);
+        return;
+      }
+
+      int partition = tp.partition();
+      long unseenOffset = offsets[partition];
+
+      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+    });
     log.info("Revoked partitions: {}", partitions);
   }
 
@@ -125,32 +138,37 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   }
 
 
-  interface MessageHandler
+  class MessageHandler
+  {
+
+  }
+
+  interface MessageHandlingStrategy
   {
-    MessageHandler handleMessage(Message message);
+    MessageHandlingStrategy handleMessage(Message message);
   }
 
 
   @RequiredArgsConstructor
-  class NoOpMessageHandler implements MessageHandler
+  class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
   {
     private final TopicPartition tp;
 
     @Override
-    public MessageHandler handleMessage(Message message)
+    public MessageHandlingStrategy handleMessage(Message message)
     {
       log.warn("Not handling message {} for partition {}", message, tp);
       return this;
     }
   }
 
-  class ChatRoomLoadingMessageHandler implements MessageHandler
+  class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
   {
     private final TopicPartition tp;
     private final long currentOffset;
     private final long unseenOffset;
 
-    ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
+    ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
     {
       this.tp = tp;
       this.currentOffset = currentOffset;
@@ -160,7 +178,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     }
 
     @Override
-    public MessageHandler handleMessage(Message message)
+    public MessageHandlingStrategy handleMessage(Message message)
     {
       // todo
       return this;
@@ -168,12 +186,12 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   }
 
   @RequiredArgsConstructor
-  class DefaultMessageHandler implements MessageHandler
+  class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
   {
     private final TopicPartition tp;
 
     @Override
-    public MessageHandler handleMessage(Message message)
+    public MessageHandlingStrategy handleMessage(Message message)
     {
       chatrooms[tp.partition()].put()
       return this;