WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index 4fa567c..a5d63fd 100644 (file)
@@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -19,8 +17,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
 {
   private final Consumer<String, MessageTo> consumer;
   private final String topic;
-  private final long[] offsets;
-  private final MessageHandler[] handlers;
+  // private final long[] offsets; Erst mal immer alles neu einlesen
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
@@ -32,13 +29,11 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     log.debug("Creating KafkaChatHomeService");
     this.consumer = consumer;
     this.topic = topic;
-    this.offsets = new long[numShards];
-    this.handlers = new MessageHandler[numShards];
-    for (int i=0; i< numShards; i++)
-    {
-      this.offsets[i] = 0l;
-      this.handlers[i] = new NoOpMessageHandler(i);
-    }
+    // this.offsets = new long[numShards];
+    // for (int i=0; i< numShards; i++)
+    // {
+    //   this.offsets[i] = 0l;
+    // }
     this.chatrooms = new Map[numShards];
   }
 
@@ -50,21 +45,43 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     {
       if (!tp.topic().equals(topic))
       {
-        log.warn("Ignoring unwanted TopicPartition", tp);
+        log.warn("Ignoring partition from unwanted topic: {}", tp);
         return;
       }
 
       int partition = tp.partition();
-      long unseenOffset = offsets[partition];
+      long unseenOffset = 0; // offsets[partition];
 
-      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
-      handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
+      log.info(
+          "Loading messages from partition {}: start-offset={} -> current-offset={}",
+          partition,
+          unseenOffset,
+          currentOffset);
+
+      consumer.seek(tp, unseenOffset);
+      chatrooms[partition]
+          .values()
+          .stream()
+      handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
     });
   }
 
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    partitions.forEach(tp ->
+    {
+      if (!tp.topic().equals(topic))
+      {
+        log.warn("Ignoring partition from unwanted topic: {}", tp);
+        return;
+      }
+
+      int partition = tp.partition();
+      long unseenOffset = offsets[partition];
+
+      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+    });
     log.info("Revoked partitions: {}", partitions);
   }
 
@@ -123,60 +140,4 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  interface MessageHandler
-  {
-    MessageHandler handleMessage(Message message);
-  }
-
-
-  @RequiredArgsConstructor
-  class NoOpMessageHandler implements MessageHandler
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandler handleMessage(Message message)
-    {
-      log.warn("Not handling message {} for partition {}", message, tp);
-      return this;
-    }
-  }
-
-  class ChatRoomLoadingMessageHandler implements MessageHandler
-  {
-    private final TopicPartition tp;
-    private final long currentOffset;
-    private final long unseenOffset;
-
-    ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
-    {
-      this.tp = tp;
-      this.currentOffset = currentOffset;
-      this.unseenOffset = unseenOffset;
-
-      consumer.seek(tp, unseenOffset);
-    }
-
-    @Override
-    public MessageHandler handleMessage(Message message)
-    {
-      // todo
-      return this;
-    }
-  }
-
-  @RequiredArgsConstructor
-  class DefaultMessageHandler implements MessageHandler
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandler handleMessage(Message message)
-    {
-      chatrooms[tp.partition()].put()
-      return this;
-    }
-  }
 }