WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index 556a226..2e3b42f 100644 (file)
@@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -37,7 +35,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     for (int i=0; i< numShards; i++)
     {
       this.offsets[i] = 0l;
-      this.handlers[i] = new MessageHandler(new TopicPartition(topic, i));
+      this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i));
     }
     this.chatrooms = new Map[numShards];
   }
@@ -136,65 +134,4 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  class MessageHandler
-  {
-
-  }
-
-  interface MessageHandlingStrategy
-  {
-    MessageHandlingStrategy handleMessage(Message message);
-  }
-
-
-  @RequiredArgsConstructor
-  class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      log.warn("Not handling message {} for partition {}", message, tp);
-      return this;
-    }
-  }
-
-  class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-    private final long currentOffset;
-    private final long unseenOffset;
-
-    ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
-    {
-      this.tp = tp;
-      this.currentOffset = currentOffset;
-      this.unseenOffset = unseenOffset;
-
-      consumer.seek(tp, unseenOffset);
-    }
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      // todo
-      return this;
-    }
-  }
-
-  @RequiredArgsConstructor
-  class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      chatrooms[tp.partition()].put()
-      return this;
-    }
-  }
 }