WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index 4fa567c..2e3b42f 100644 (file)
@@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -37,7 +35,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     for (int i=0; i< numShards; i++)
     {
       this.offsets[i] = 0l;
-      this.handlers[i] = new NoOpMessageHandler(i);
+      this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i));
     }
     this.chatrooms = new Map[numShards];
   }
@@ -50,7 +48,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     {
       if (!tp.topic().equals(topic))
       {
-        log.warn("Ignoring unwanted TopicPartition", tp);
+        log.warn("Ignoring partition from unwanted topic: {}", tp);
         return;
       }
 
@@ -58,13 +56,26 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       long unseenOffset = offsets[partition];
 
       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
-      handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
+      handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
     });
   }
 
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    partitions.forEach(tp ->
+    {
+      if (!tp.topic().equals(topic))
+      {
+        log.warn("Ignoring partition from unwanted topic: {}", tp);
+        return;
+      }
+
+      int partition = tp.partition();
+      long unseenOffset = offsets[partition];
+
+      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+    });
     log.info("Revoked partitions: {}", partitions);
   }
 
@@ -123,60 +134,4 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  interface MessageHandler
-  {
-    MessageHandler handleMessage(Message message);
-  }
-
-
-  @RequiredArgsConstructor
-  class NoOpMessageHandler implements MessageHandler
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandler handleMessage(Message message)
-    {
-      log.warn("Not handling message {} for partition {}", message, tp);
-      return this;
-    }
-  }
-
-  class ChatRoomLoadingMessageHandler implements MessageHandler
-  {
-    private final TopicPartition tp;
-    private final long currentOffset;
-    private final long unseenOffset;
-
-    ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
-    {
-      this.tp = tp;
-      this.currentOffset = currentOffset;
-      this.unseenOffset = unseenOffset;
-
-      consumer.seek(tp, unseenOffset);
-    }
-
-    @Override
-    public MessageHandler handleMessage(Message message)
-    {
-      // todo
-      return this;
-    }
-  }
-
-  @RequiredArgsConstructor
-  class DefaultMessageHandler implements MessageHandler
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandler handleMessage(Message message)
-    {
-      chatrooms[tp.partition()].put()
-      return this;
-    }
-  }
 }