fix: GREEN - `DataChannel` creates entries for existent chat-rooms
authorKai Moritz <kai@juplo.de>
Fri, 15 Sep 2023 18:56:14 +0000 (20:56 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:35:33 +0000 (10:35 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java

index 4d5a141..d2d6f30 100644 (file)
@@ -36,6 +36,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
+  private final InfoChannel infoChannel;
 
   private boolean running;
   @Getter
@@ -49,7 +50,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     ZoneId zoneId,
     int numShards,
     int bufferSize,
-    Clock clock)
+    Clock clock,
+    InfoChannel infoChannel)
   {
     log.debug(
         "Creating DataChannel for topic {} with {} partitions",
@@ -68,10 +70,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(shard ->
-        {
-          this.chatRoomData[shard] = new HashMap<>();
-        });
+        .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
+    this.infoChannel = infoChannel;
   }
 
 
@@ -296,6 +296,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       return Mono.error(new ShardNotOwnedException(shard));
     }
 
-    return Mono.justOrEmpty(chatRoomData[shard].get(id));
+    return infoChannel
+        .getChatRoomInfo(id)
+        .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent(
+            id,
+            (chatRoomId) ->
+            {
+              log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+              KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
+              return new ChatRoomData(clock, service, bufferSize);
+            }));
   }
 }
index ad03f0d..26e8696 100644 (file)
@@ -84,7 +84,7 @@ public class InfoChannel implements Runnable
         if (metadata != null)
         {
           log.info("Successfully sent chreate-request for chat room: {}", to);
-          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
+          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
           sink.success(chatRoomInfo);
         }
         else
index b5bac47..784ffa5 100644 (file)
@@ -133,7 +133,8 @@ public class KafkaServicesConfiguration
       Producer<String, AbstractMessageTo> producer,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
-      Clock clock)
+      Clock clock,
+      InfoChannel infoChannel)
   {
     return new DataChannel(
         properties.getKafka().getDataChannelTopic(),
@@ -142,7 +143,8 @@ public class KafkaServicesConfiguration
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
-        clock);
+        clock,
+        infoChannel);
   }
 
   @Bean
index 3eb9096..b1bead9 100644 (file)
@@ -36,7 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          return new ChatRoomInfo(chatRoomId, chatRoomTo.getName());
+          return new ChatRoomInfo(chatRoomId, chatRoomTo.getName(), null);
         });
   }