--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+
+
+@RequiredArgsConstructor
+class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
+{
+ private final TopicPartition tp;
+
+ @Override
+ public MessageHandlingStrategy handleMessage(Message message)
+ {
+ chatrooms[tp.partition()].put()
+ return this;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+
+class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
+{
+ private final Consumer consumer;
+ private final TopicPartition tp;
+ private final long currentOffset;
+ private final long unseenOffset;
+
+ ChatRoomLoadingMessageHandlingStrategy(
+ Consumer consumer,
+ TopicPartition tp,
+ long currentOffset,
+ long unseenOffset)
+ {
+ this.consumer = consumer;
+ this.tp = tp;
+ this.currentOffset = currentOffset;
+ this.unseenOffset = unseenOffset;
+
+ consumer.seek(tp, unseenOffset);
+ }
+
+ @Override
+ public MessageHandlingStrategy handleMessage(Message message)
+ {
+ // todo
+ return this;
+ }
+}
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
{
return Flux.fromStream(chatrooms[shard].values().stream());
}
-
-
- class MessageHandler
- {
-
- }
-
- interface MessageHandlingStrategy
- {
- MessageHandlingStrategy handleMessage(Message message);
- }
-
-
- @RequiredArgsConstructor
- class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
- {
- private final TopicPartition tp;
-
- @Override
- public MessageHandlingStrategy handleMessage(Message message)
- {
- log.warn("Not handling message {} for partition {}", message, tp);
- return this;
- }
- }
-
- class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
- {
- private final TopicPartition tp;
- private final long currentOffset;
- private final long unseenOffset;
-
- ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
- {
- this.tp = tp;
- this.currentOffset = currentOffset;
- this.unseenOffset = unseenOffset;
-
- consumer.seek(tp, unseenOffset);
- }
-
- @Override
- public MessageHandlingStrategy handleMessage(Message message)
- {
- // todo
- return this;
- }
- }
-
- @RequiredArgsConstructor
- class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
- {
- private final TopicPartition tp;
-
- @Override
- public MessageHandlingStrategy handleMessage(Message message)
- {
- chatrooms[tp.partition()].put()
- return this;
- }
- }
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+class MessageHandler
+{
+
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+
+
+interface MessageHandlingStrategy
+{
+ MessageHandlingStrategy handleMessage(Message message);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+
+
+@RequiredArgsConstructor
+class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
+{
+ private final TopicPartition tp;
+
+ @Override
+ public MessageHandlingStrategy handleMessage(Message message)
+ {
+ KafkaChatHomeService.log.warn("Not handling message {} for partition {}", message, tp);
+ return this;
+ }
+}