]> juplo.de Git - demos/kafka/chat/commitdiff
refactor: `ChatMessageService` knows its corresponding `ChatRoomInfo`
authorKai Moritz <kai@juplo.de>
Sun, 24 Mar 2024 13:47:43 +0000 (14:47 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 24 Mar 2024 19:35:12 +0000 (20:35 +0100)
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChatRoomInactiveException.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java

index 95977d8d8a1d05955b7624afda2a94db96161ebc..cdcee668d330099011c4ef65e4e24afcad376269 100644 (file)
@@ -171,7 +171,7 @@ public class ChatBackendControllerAdvice
 
     problem.setDetail(e.getMessage());
 
-    problem.setProperty("chatroom", e.getChatRoomId());
+    problem.setProperty("chatroom", e.getChatRoomInfo());
 
     return problem;
   }
index 293a2409c2d8f6c1b10c37a260533b029a01f7f7..9bff96ba2894b4d4e9e3de8ac668a79d4ddb1a7b 100644 (file)
@@ -4,12 +4,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
-import java.util.UUID;
 
 
 public interface ChatMessageService
 {
-  UUID getChatRoomId();
+  ChatRoomInfo getChatRoomInfo();
 
   Mono<Message> persistMessage(
       Message.MessageKey key,
index 20c046d810a37231ba2a5a693db9ac7129b12aea..c88e1c30ab307b74999af33274e7deb80f3a5d9f 100644 (file)
@@ -76,7 +76,7 @@ public class ChatRoomData
                     log.warn("Emitting of message failed with {} for {}", result.name(), m);
                   }
                 })
-            : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
+            : Mono.error(new ChatRoomInactiveException(service.getChatRoomInfo())));
   }
 
 
@@ -98,7 +98,7 @@ public class ChatRoomData
             .asFlux()
             .doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
         : Flux
-            .error(new ChatRoomInactiveException(service.getChatRoomId()));
+            .error(new ChatRoomInactiveException(service.getChatRoomInfo()));
 
   }
 
@@ -116,18 +116,18 @@ public class ChatRoomData
   {
     if (active)
     {
-      log.info("{} is already active!", service.getChatRoomId());
+      log.info("{} is already active!", service.getChatRoomInfo());
       return;
     }
 
-    log.info("{} is being activated", service.getChatRoomId());
+    log.info("{} is being activated", service.getChatRoomInfo());
     this.sink = createSink();
     active = true;
   }
 
   public void deactivate()
   {
-    log.info("{} is being deactivated", service.getChatRoomId());
+    log.info("{} is being deactivated", service.getChatRoomInfo());
     active = false;
     sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
   }
index ca804e1ca1cd5861d3c31f7284aa032b58e96334..e304e73ab2e3f6778b6fb395da5f20a66b27d547 100644 (file)
@@ -1,19 +1,18 @@
 package de.juplo.kafka.chat.backend.domain.exceptions;
 
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import lombok.Getter;
 
-import java.util.UUID;
-
 
 public class ChatRoomInactiveException extends IllegalStateException
 {
   @Getter
-  private final UUID chatRoomId;
+  private final ChatRoomInfo chatRoomInfo;
 
 
-  public ChatRoomInactiveException(UUID chatRoomId)
+  public ChatRoomInactiveException(ChatRoomInfo chatRoomInfo)
   {
-    super("Chat-Room " + chatRoomId + " is currently inactive.");
-    this.chatRoomId = chatRoomId;
+    super("Chat-Room " + chatRoomInfo + " is currently inactive.");
+    this.chatRoomInfo = chatRoomInfo;
   }
 }
index a9a76a590543b7188999c35835ce0bfe1a620027..ac65054ab8d333e2fce7dabee4497bb25c9debe1 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
 import lombok.Getter;
@@ -17,21 +18,21 @@ import java.util.UUID;
 public class InMemoryChatMessageService implements ChatMessageService
 {
   @Getter
-  private final UUID chatRoomId;
+  private final ChatRoomInfo chatRoomInfo;
   private final LinkedHashMap<Message.MessageKey, Message> messages;
 
 
-  public InMemoryChatMessageService(UUID chatRoomId)
+  public InMemoryChatMessageService(ChatRoomInfo chatRoomInfo)
   {
     log.debug("Creating InMemoryChatMessageService");
-    this.chatRoomId = chatRoomId;
+    this.chatRoomInfo = chatRoomInfo;
     messages = new LinkedHashMap<>();
   }
 
 
   Mono<Void> restore(StorageStrategy storageStrategy)
   {
-    Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
+    Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomInfo.getId());
 
     return messageFlux
         .doOnNext(message -> messages.put(message.getKey(), message))
index e78b049b07cbf15ef7b48ab3c2fcd8ffefd56385..dc68bf89258d43f106b54f13df6b630d3369496e 100644 (file)
@@ -71,11 +71,10 @@ public class SimpleChatHomeService implements ChatHomeService
         })
         .flatMap(info ->
         {
-          UUID chatRoomId = info.getId();
           InMemoryChatMessageService chatMessageService =
-              new InMemoryChatMessageService(chatRoomId);
+              new InMemoryChatMessageService(info);
 
-          chatRoomInfo.put(chatRoomId, info);
+          chatRoomInfo.put(info.getId(), info);
           ChatRoomData chatRoomData =
               new ChatRoomData(
                   clock,
@@ -98,8 +97,8 @@ public class SimpleChatHomeService implements ChatHomeService
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
     log.info("Creating ChatRoom with history-limit {}", historyLimit);
-    ChatMessageService service = new InMemoryChatMessageService(id);
     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+    ChatMessageService service = new InMemoryChatMessageService(chatRoomInfo);
     this.chatRoomInfo.put(id, chatRoomInfo);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
     chatRoomData.activate();
index 0bffa0b25f0bef5e1923e880a39ea3733bb12dbf..327e9e6b235cd87e98c7637f6324085eb09b2438 100644 (file)
@@ -389,7 +389,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     else
     {
       log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
-      KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
+      KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomInfo);
       chatRoomData = new ChatRoomData(clock, service, historyLimit);
       this.chatRoomData[shard].put(chatRoomId, chatRoomData);
     }
index f93a534ab8305b14fce50930cb379541c942c911..01f28e8942d9a94b3e192c28d45a7f2814814661 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -10,7 +11,6 @@ import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
-import java.util.UUID;
 
 
 @RequiredArgsConstructor
@@ -19,7 +19,7 @@ public class KafkaChatMessageService implements ChatMessageService
 {
   private final DataChannel dataChannel;
   @Getter
-  private final UUID chatRoomId;
+  private final ChatRoomInfo chatRoomInfo;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
@@ -31,7 +31,7 @@ public class KafkaChatMessageService implements ChatMessageService
     String text)
   {
     return dataChannel
-        .sendChatMessage(chatRoomId, key, timestamp, text)
+        .sendChatMessage(chatRoomInfo.getId(), key, timestamp, text)
         .doOnSuccess(message -> persistMessage(message));
   }