]> juplo.de Git - demos/kafka/chat/commitdiff
refactor: Only `ChannelMediator#chatRoomCreated()` creates `ChatRoomData`
authorKai Moritz <kai@juplo.de>
Sat, 23 Mar 2024 15:33:13 +0000 (16:33 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 24 Mar 2024 19:28:59 +0000 (20:28 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 0ead9ef96071e7345a7477202909ca934a778b7a..0bffa0b25f0bef5e1923e880a39ea3733bb12dbf 100644 (file)
@@ -287,7 +287,12 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
-    ChatRoomData chatRoomData = computeChatRoomData(chatRoomId, partition);
+    ChatRoomInfo chatRoomInfo = Mono
+        .just(chatRoomId)
+        .flatMap(id -> channelMediator.getChatRoomInfo(id))
+        .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
+        .block();
+    ChatRoomData chatRoomData = this.chatRoomData[chatRoomInfo.getShard()].get(chatRoomId);
     KafkaChatMessageService kafkaChatRoomService =
         (KafkaChatMessageService) chatRoomData.getChatRoomService();
 
@@ -344,9 +349,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
   {
     int shard = chatRoomInfo.getShard();
 
-    ChatRoomData chatRoomData = computeChatRoomData(
-        chatRoomInfo.getId(),
-        chatRoomInfo.getShard());
+    ChatRoomData chatRoomData = computeChatRoomData(chatRoomInfo);
 
     // TODO: Possible race-condition in case of an ongoing rebalance!
     if (isShardOwned[shard])
@@ -371,15 +374,17 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     return Mono.justOrEmpty(chatRoomData[shard].get(id));
   }
 
-  private ChatRoomData computeChatRoomData(UUID chatRoomId, int shard)
+  private ChatRoomData computeChatRoomData(ChatRoomInfo chatRoomInfo)
   {
+    UUID chatRoomId = chatRoomInfo.getId();
+    int shard = chatRoomInfo.getShard();
     ChatRoomData chatRoomData = this.chatRoomData[shard].get(chatRoomId);
 
     if (chatRoomData != null)
     {
-      log.info(
+      log.error(
           "Ignoring request to create already existing ChatRoomData for {}",
-          chatRoomId);
+          chatRoomInfo);
     }
     else
     {