WIP
authorKai Moritz <kai@juplo.de>
Tue, 24 Jan 2023 17:43:01 +0000 (18:43 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:52:12 +0000 (18:52 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index 04fbcfb..4fa567c 100644 (file)
@@ -58,8 +58,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       long unseenOffset = offsets[partition];
 
       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
-      ChatRoomLoader loader = new ChatRoomLoader(partition, currentOffset, unseenOffset);
-      consumer.seek(tp, unseenOffset);
+      handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
     });
   }
 
@@ -135,23 +134,30 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   @RequiredArgsConstructor
   class NoOpMessageHandler implements MessageHandler
   {
-    private final int partition;
+    private final TopicPartition tp;
 
     @Override
     public MessageHandler handleMessage(Message message)
     {
-      log.warn("Not handling message {} for partition {}", message, partition);
+      log.warn("Not handling message {} for partition {}", message, tp);
       return this;
     }
   }
 
-  @RequiredArgsConstructor
-  class ChatRoomLoader implements MessageHandler
+  class ChatRoomLoadingMessageHandler implements MessageHandler
   {
-    private final int partition;
+    private final TopicPartition tp;
     private final long currentOffset;
     private final long unseenOffset;
 
+    ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
+    {
+      this.tp = tp;
+      this.currentOffset = currentOffset;
+      this.unseenOffset = unseenOffset;
+
+      consumer.seek(tp, unseenOffset);
+    }
 
     @Override
     public MessageHandler handleMessage(Message message)
@@ -160,4 +166,17 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       return this;
     }
   }
+
+  @RequiredArgsConstructor
+  class DefaultMessageHandler implements MessageHandler
+  {
+    private final TopicPartition tp;
+
+    @Override
+    public MessageHandler handleMessage(Message message)
+    {
+      chatrooms[tp.partition()].put()
+      return this;
+    }
+  }
 }