WIP
authorKai Moritz <kai@juplo.de>
Tue, 24 Jan 2023 17:33:13 +0000 (18:33 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:52:12 +0000 (18:52 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

index 4a87d31..04fbcfb 100644 (file)
@@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -18,6 +20,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   private final Consumer<String, MessageTo> consumer;
   private final String topic;
   private final long[] offsets;
+  private final MessageHandler[] handlers;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
@@ -30,8 +33,12 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     this.consumer = consumer;
     this.topic = topic;
     this.offsets = new long[numShards];
+    this.handlers = new MessageHandler[numShards];
     for (int i=0; i< numShards; i++)
+    {
       this.offsets[i] = 0l;
+      this.handlers[i] = new NoOpMessageHandler(i);
+    }
     this.chatrooms = new Map[numShards];
   }
 
@@ -51,6 +58,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       long unseenOffset = offsets[partition];
 
       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+      ChatRoomLoader loader = new ChatRoomLoader(partition, currentOffset, unseenOffset);
       consumer.seek(tp, unseenOffset);
     });
   }
@@ -116,4 +124,40 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
+
+
+  interface MessageHandler
+  {
+    MessageHandler handleMessage(Message message);
+  }
+
+
+  @RequiredArgsConstructor
+  class NoOpMessageHandler implements MessageHandler
+  {
+    private final int partition;
+
+    @Override
+    public MessageHandler handleMessage(Message message)
+    {
+      log.warn("Not handling message {} for partition {}", message, partition);
+      return this;
+    }
+  }
+
+  @RequiredArgsConstructor
+  class ChatRoomLoader implements MessageHandler
+  {
+    private final int partition;
+    private final long currentOffset;
+    private final long unseenOffset;
+
+
+    @Override
+    public MessageHandler handleMessage(Message message)
+    {
+      // todo
+      return this;
+    }
+  }
 }