From ca28496284c304e3d8e80d3d29417dcfb3a12bac Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Mar 2024 14:47:43 +0100 Subject: [PATCH] refactor: `ChatMessageService` knows its corresponding `ChatRoomInfo` --- .../chat/backend/api/ChatBackendControllerAdvice.java | 2 +- .../kafka/chat/backend/domain/ChatMessageService.java | 3 +-- .../juplo/kafka/chat/backend/domain/ChatRoomData.java | 10 +++++----- .../domain/exceptions/ChatRoomInactiveException.java | 11 +++++------ .../inmemory/InMemoryChatMessageService.java | 9 +++++---- .../inmemory/SimpleChatHomeService.java | 7 +++---- .../backend/implementation/kafka/DataChannel.java | 2 +- .../implementation/kafka/KafkaChatMessageService.java | 6 +++--- 8 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java index 95977d8d..cdcee668 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java @@ -171,7 +171,7 @@ public class ChatBackendControllerAdvice problem.setDetail(e.getMessage()); - problem.setProperty("chatroom", e.getChatRoomId()); + problem.setProperty("chatroom", e.getChatRoomInfo()); return problem; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java index 293a2409..9bff96ba 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java @@ -4,12 +4,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; -import java.util.UUID; public interface ChatMessageService { - UUID getChatRoomId(); + ChatRoomInfo getChatRoomInfo(); Mono persistMessage( Message.MessageKey key, diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index 20c046d8..c88e1c30 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -76,7 +76,7 @@ public class ChatRoomData log.warn("Emitting of message failed with {} for {}", result.name(), m); } }) - : Mono.error(new ChatRoomInactiveException(service.getChatRoomId()))); + : Mono.error(new ChatRoomInactiveException(service.getChatRoomInfo()))); } @@ -98,7 +98,7 @@ public class ChatRoomData .asFlux() .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel! : Flux - .error(new ChatRoomInactiveException(service.getChatRoomId())); + .error(new ChatRoomInactiveException(service.getChatRoomInfo())); } @@ -116,18 +116,18 @@ public class ChatRoomData { if (active) { - log.info("{} is already active!", service.getChatRoomId()); + log.info("{} is already active!", service.getChatRoomInfo()); return; } - log.info("{} is being activated", service.getChatRoomId()); + log.info("{} is being activated", service.getChatRoomInfo()); this.sink = createSink(); active = true; } public void deactivate() { - log.info("{} is being deactivated", service.getChatRoomId()); + log.info("{} is being deactivated", service.getChatRoomInfo()); active = false; sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java index ca804e1c..e304e73a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java @@ -1,19 +1,18 @@ package de.juplo.kafka.chat.backend.domain.exceptions; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import lombok.Getter; -import java.util.UUID; - public class ChatRoomInactiveException extends IllegalStateException { @Getter - private final UUID chatRoomId; + private final ChatRoomInfo chatRoomInfo; - public ChatRoomInactiveException(UUID chatRoomId) + public ChatRoomInactiveException(ChatRoomInfo chatRoomInfo) { - super("Chat-Room " + chatRoomId + " is currently inactive."); - this.chatRoomId = chatRoomId; + super("Chat-Room " + chatRoomInfo + " is currently inactive."); + this.chatRoomInfo = chatRoomInfo; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index a9a76a59..ac65054a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.domain.ChatMessageService; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import lombok.Getter; @@ -17,21 +18,21 @@ import java.util.UUID; public class InMemoryChatMessageService implements ChatMessageService { @Getter - private final UUID chatRoomId; + private final ChatRoomInfo chatRoomInfo; private final LinkedHashMap messages; - public InMemoryChatMessageService(UUID chatRoomId) + public InMemoryChatMessageService(ChatRoomInfo chatRoomInfo) { log.debug("Creating InMemoryChatMessageService"); - this.chatRoomId = chatRoomId; + this.chatRoomInfo = chatRoomInfo; messages = new LinkedHashMap<>(); } Mono restore(StorageStrategy storageStrategy) { - Flux messageFlux = storageStrategy.readChatRoomData(chatRoomId); + Flux messageFlux = storageStrategy.readChatRoomData(chatRoomInfo.getId()); return messageFlux .doOnNext(message -> messages.put(message.getKey(), message)) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index e78b049b..dc68bf89 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -71,11 +71,10 @@ public class SimpleChatHomeService implements ChatHomeService }) .flatMap(info -> { - UUID chatRoomId = info.getId(); InMemoryChatMessageService chatMessageService = - new InMemoryChatMessageService(chatRoomId); + new InMemoryChatMessageService(info); - chatRoomInfo.put(chatRoomId, info); + chatRoomInfo.put(info.getId(), info); ChatRoomData chatRoomData = new ChatRoomData( clock, @@ -98,8 +97,8 @@ public class SimpleChatHomeService implements ChatHomeService public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with history-limit {}", historyLimit); - ChatMessageService service = new InMemoryChatMessageService(id); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + ChatMessageService service = new InMemoryChatMessageService(chatRoomInfo); this.chatRoomInfo.put(id, chatRoomInfo); ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit); chatRoomData.activate(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 0bffa0b2..327e9e6b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -389,7 +389,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener else { log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomInfo); chatRoomData = new ChatRoomData(clock, service, historyLimit); this.chatRoomData[shard].put(chatRoomId, chatRoomData); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java index f93a534a..01f28e89 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatMessageService; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -10,7 +11,6 @@ import reactor.core.publisher.Mono; import java.time.LocalDateTime; import java.util.LinkedHashMap; -import java.util.UUID; @RequiredArgsConstructor @@ -19,7 +19,7 @@ public class KafkaChatMessageService implements ChatMessageService { private final DataChannel dataChannel; @Getter - private final UUID chatRoomId; + private final ChatRoomInfo chatRoomInfo; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -31,7 +31,7 @@ public class KafkaChatMessageService implements ChatMessageService String text) { return dataChannel - .sendChatMessage(chatRoomId, key, timestamp, text) + .sendChatMessage(chatRoomInfo.getId(), key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } -- 2.39.5