From: Kai Moritz Date: Sun, 26 Feb 2023 14:12:18 +0000 (+0100) Subject: WIP X-Git-Tag: kafkadata~42 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2c970f2179bab526d8cb09f160ecbe5d9f65aee9;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java index 49ace7ae..cfd0e4ec 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java @@ -27,7 +27,7 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy @Override - public Mono persistMessage( + public Mono handleMessage( Message.MessageKey key, LocalDateTime timestamp, String text) @@ -63,9 +63,4 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy })); }); } - - @Override - public MessageHandlingStrategy handleMessage(Message message) - { - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 91b50312..c4737a14 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -48,9 +48,13 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return strategy.persistMessage(key, timestamp, text); + return strategy.handleMessage(key, timestamp, text); } + /** + * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über + * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)} + */ protected void addMessage(Message message) throws MessageMutationException { Message existingMessage = messages.get(message.getKey()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java deleted file mode 100644 index 7209c224..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandler.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; - - -@RequiredArgsConstructor -class MessageHandler -{ - private final Consumer consumer; - private final TopicPartition tp; - - - void handleMessage(Message message) - { - - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java index 097ad736..1fb4c47d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java @@ -8,10 +8,8 @@ import java.time.LocalDateTime; interface MessageHandlingStrategy { - Mono persistMessage( + Mono handleMessage( Message.MessageKey key, LocalDateTime timestamp, String text); - - MessageHandlingStrategy handleMessage(Message message); }