WIP
authorKai Moritz <kai@juplo.de>
Fri, 17 Feb 2023 18:11:02 +0000 (19:11 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 11:14:59 +0000 (12:14 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/NoOpMessageHandlingStrategy.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..8eac990
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+
+
+@RequiredArgsConstructor
+class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
+{
+  private final TopicPartition tp;
+
+  @Override
+  public MessageHandlingStrategy handleMessage(Message message)
+  {
+    chatrooms[tp.partition()].put()
+    return this;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..af99b80
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+
+class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
+{
+  private final Consumer consumer;
+  private final TopicPartition tp;
+  private final long currentOffset;
+  private final long unseenOffset;
+
+  ChatRoomLoadingMessageHandlingStrategy(
+      Consumer consumer,
+      TopicPartition tp,
+      long currentOffset,
+      long unseenOffset)
+  {
+    this.consumer = consumer;
+    this.tp = tp;
+    this.currentOffset = currentOffset;
+    this.unseenOffset = unseenOffset;
+
+    consumer.seek(tp, unseenOffset);
+  }
+
+  @Override
+  public MessageHandlingStrategy handleMessage(Message message)
+  {
+    // todo
+    return this;
+  }
+}
index 556a226..70a96bc 100644 (file)
@@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -136,65 +134,4 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
-
-
-  class MessageHandler
-  {
-
-  }
-
-  interface MessageHandlingStrategy
-  {
-    MessageHandlingStrategy handleMessage(Message message);
-  }
-
-
-  @RequiredArgsConstructor
-  class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      log.warn("Not handling message {} for partition {}", message, tp);
-      return this;
-    }
-  }
-
-  class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-    private final long currentOffset;
-    private final long unseenOffset;
-
-    ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
-    {
-      this.tp = tp;
-      this.currentOffset = currentOffset;
-      this.unseenOffset = unseenOffset;
-
-      consumer.seek(tp, unseenOffset);
-    }
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      // todo
-      return this;
-    }
-  }
-
-  @RequiredArgsConstructor
-  class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
-  {
-    private final TopicPartition tp;
-
-    @Override
-    public MessageHandlingStrategy handleMessage(Message message)
-    {
-      chatrooms[tp.partition()].put()
-      return this;
-    }
-  }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java
new file mode 100644 (file)
index 0000000..271f52d
--- /dev/null
@@ -0,0 +1,6 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+class MessageHandler
+{
+
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..194b4d0
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+
+
+interface MessageHandlingStrategy
+{
+  MessageHandlingStrategy handleMessage(Message message);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/NoOpMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/NoOpMessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..c25418a
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+
+
+@RequiredArgsConstructor
+class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
+{
+  private final TopicPartition tp;
+
+  @Override
+  public MessageHandlingStrategy handleMessage(Message message)
+  {
+    KafkaChatHomeService.log.warn("Not handling message {} for partition {}", message, tp);
+    return this;
+  }
+}