WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index 556a226..b2ed284 100644 (file)
@@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -20,7 +18,6 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   private final Consumer<String, MessageTo> consumer;
   private final String topic;
   private final long[] offsets;
-  private final MessageHandler[] handlers;
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
@@ -33,13 +30,14 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     this.consumer = consumer;
     this.topic = topic;
     this.offsets = new long[numShards];
-    this.handlers = new MessageHandler[numShards];
+    this.chatrooms = new Map<>[numShards];
     for (int i=0; i< numShards; i++)
     {
       this.offsets[i] = 0l;
-      this.handlers[i] = new MessageHandler(new TopicPartition(topic, i));
+      this.chatrooms[i] = chat
     }
     this.chatrooms = new Map[numShards];
+    this.chatrooms = new Map[numShards];
   }
 
 
@@ -57,7 +55,11 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       int partition = tp.partition();
       long unseenOffset = offsets[partition];
 
-      log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+      log.info(
+          "Loading messages from partition {}: start-offset={} -> current-offset={}",
+          partition,
+          unseenOffset,
+          currentOffset);
       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
     });
   }
@@ -136,65 +138,4 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  class MessageHandler
-  {
-
-  }
-
-  interface MessageHandlingStrategy
-  {
-    MessageHandlingStrategy handleMessage(Message message);
-  }
-
-
-  @RequiredArgsConstructor
-  class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      log.warn("Not handling message {} for partition {}", message, tp);
-      return this;
-    }
-  }
-
-  class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-    private final long currentOffset;
-    private final long unseenOffset;
-
-    ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
-    {
-      this.tp = tp;
-      this.currentOffset = currentOffset;
-      this.unseenOffset = unseenOffset;
-
-      consumer.seek(tp, unseenOffset);
-    }
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      // todo
-      return this;
-    }
-  }
-
-  @RequiredArgsConstructor
-  class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      chatrooms[tp.partition()].put()
-      return this;
-    }
-  }
 }