fix: GREEN - `DataChannel` creates entries for existent chat-rooms
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 4d5a141..d2d6f30 100644 (file)
@@ -36,6 +36,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
+  private final InfoChannel infoChannel;
 
   private boolean running;
   @Getter
@@ -49,7 +50,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     ZoneId zoneId,
     int numShards,
     int bufferSize,
-    Clock clock)
+    Clock clock,
+    InfoChannel infoChannel)
   {
     log.debug(
         "Creating DataChannel for topic {} with {} partitions",
@@ -68,10 +70,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(shard ->
-        {
-          this.chatRoomData[shard] = new HashMap<>();
-        });
+        .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
+    this.infoChannel = infoChannel;
   }
 
 
@@ -296,6 +296,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       return Mono.error(new ShardNotOwnedException(shard));
     }
 
-    return Mono.justOrEmpty(chatRoomData[shard].get(id));
+    return infoChannel
+        .getChatRoomInfo(id)
+        .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent(
+            id,
+            (chatRoomId) ->
+            {
+              log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+              KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
+              return new ChatRoomData(clock, service, bufferSize);
+            }));
   }
 }