WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index 556a226..a5d63fd 100644 (file)
@@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -19,8 +17,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
 {
   private final Consumer<String, MessageTo> consumer;
   private final String topic;
-  private final long[] offsets;
-  private final MessageHandler[] handlers;
+  // private final long[] offsets; Erst mal immer alles neu einlesen
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
@@ -32,13 +29,11 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     log.debug("Creating KafkaChatHomeService");
     this.consumer = consumer;
     this.topic = topic;
-    this.offsets = new long[numShards];
-    this.handlers = new MessageHandler[numShards];
-    for (int i=0; i< numShards; i++)
-    {
-      this.offsets[i] = 0l;
-      this.handlers[i] = new MessageHandler(new TopicPartition(topic, i));
-    }
+    // this.offsets = new long[numShards];
+    // for (int i=0; i< numShards; i++)
+    // {
+    //   this.offsets[i] = 0l;
+    // }
     this.chatrooms = new Map[numShards];
   }
 
@@ -55,9 +50,18 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       }
 
       int partition = tp.partition();
-      long unseenOffset = offsets[partition];
+      long unseenOffset = 0; // offsets[partition];
 
-      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+      log.info(
+          "Loading messages from partition {}: start-offset={} -> current-offset={}",
+          partition,
+          unseenOffset,
+          currentOffset);
+
+      consumer.seek(tp, unseenOffset);
+      chatrooms[partition]
+          .values()
+          .stream()
       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
     });
   }
@@ -136,65 +140,4 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  class MessageHandler
-  {
-
-  }
-
-  interface MessageHandlingStrategy
-  {
-    MessageHandlingStrategy handleMessage(Message message);
-  }
-
-
-  @RequiredArgsConstructor
-  class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      log.warn("Not handling message {} for partition {}", message, tp);
-      return this;
-    }
-  }
-
-  class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-    private final long currentOffset;
-    private final long unseenOffset;
-
-    ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
-    {
-      this.tp = tp;
-      this.currentOffset = currentOffset;
-      this.unseenOffset = unseenOffset;
-
-      consumer.seek(tp, unseenOffset);
-    }
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      // todo
-      return this;
-    }
-  }
-
-  @RequiredArgsConstructor
-  class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      chatrooms[tp.partition()].put()
-      return this;
-    }
-  }
 }