From 320f2e00a39f4cb7428446223cb9fb2f37d12e25 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 17 Feb 2023 19:11:02 +0100 Subject: [PATCH] WIP --- ...ChatRoomActiveMessageHandlingStrategy.java | 19 ++++++ ...hatRoomLoadingMessageHandlingStrategy.java | 35 +++++++++++ .../kafka/KafkaChatHomeService.java | 63 ------------------- .../persistence/kafka/MessageHandler.java | 6 ++ .../kafka/MessageHandlingStrategy.java | 9 +++ .../kafka/NoOpMessageHandlingStrategy.java | 19 ++++++ 6 files changed, 88 insertions(+), 63 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/NoOpMessageHandlingStrategy.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java new file mode 100644 index 00000000..8eac9900 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.TopicPartition; + + +@RequiredArgsConstructor +class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy +{ + private final TopicPartition tp; + + @Override + public MessageHandlingStrategy handleMessage(Message message) + { + chatrooms[tp.partition()].put() + return this; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java new file mode 100644 index 00000000..af99b809 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + + +class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy +{ + private final Consumer consumer; + private final TopicPartition tp; + private final long currentOffset; + private final long unseenOffset; + + ChatRoomLoadingMessageHandlingStrategy( + Consumer consumer, + TopicPartition tp, + long currentOffset, + long unseenOffset) + { + this.consumer = consumer; + this.tp = tp; + this.currentOffset = currentOffset; + this.unseenOffset = unseenOffset; + + consumer.seek(tp, unseenOffset); + } + + @Override + public MessageHandlingStrategy handleMessage(Message message) + { + // todo + return this; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 556a2268..70a96bc3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -136,65 +134,4 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL { return Flux.fromStream(chatrooms[shard].values().stream()); } - - - class MessageHandler - { - - } - - interface MessageHandlingStrategy - { - MessageHandlingStrategy handleMessage(Message message); - } - - - @RequiredArgsConstructor - class NoOpMessageHandlingStrategy implements MessageHandlingStrategy - { - private final TopicPartition tp; - - @Override - public MessageHandlingStrategy handleMessage(Message message) - { - log.warn("Not handling message {} for partition {}", message, tp); - return this; - } - } - - class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy - { - private final TopicPartition tp; - private final long currentOffset; - private final long unseenOffset; - - ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset) - { - this.tp = tp; - this.currentOffset = currentOffset; - this.unseenOffset = unseenOffset; - - consumer.seek(tp, unseenOffset); - } - - @Override - public MessageHandlingStrategy handleMessage(Message message) - { - // todo - return this; - } - } - - @RequiredArgsConstructor - class DefaultMessageHandlingStrategy implements MessageHandlingStrategy - { - private final TopicPartition tp; - - @Override - public MessageHandlingStrategy handleMessage(Message message) - { - chatrooms[tp.partition()].put() - return this; - } - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java new file mode 100644 index 00000000..271f52d8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java @@ -0,0 +1,6 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +class MessageHandler +{ + +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java new file mode 100644 index 00000000..194b4d04 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; + + +interface MessageHandlingStrategy +{ + MessageHandlingStrategy handleMessage(Message message); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/NoOpMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/NoOpMessageHandlingStrategy.java new file mode 100644 index 00000000..c25418a8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/NoOpMessageHandlingStrategy.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.TopicPartition; + + +@RequiredArgsConstructor +class NoOpMessageHandlingStrategy implements MessageHandlingStrategy +{ + private final TopicPartition tp; + + @Override + public MessageHandlingStrategy handleMessage(Message message) + { + KafkaChatHomeService.log.warn("Not handling message {} for partition {}", message, tp); + return this; + } +} -- 2.20.1