fix: Fixed `ConcurrentModificationException` when accessing a chat-room
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
index a6351d0..f3150ce 100644 (file)
@@ -39,6 +39,7 @@ public class InfoChannel implements Runnable
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final String instanceUri;
+  private final ChannelMediator channelMediator;
 
   private boolean running;
   @Getter
@@ -51,7 +52,8 @@ public class InfoChannel implements Runnable
     Consumer<String, AbstractMessageTo> infoChannelConsumer,
     Duration pollingInterval,
     int numShards,
-    URI instanceUri)
+    URI instanceUri,
+    ChannelMediator channelMediator)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -72,6 +74,8 @@ public class InfoChannel implements Runnable
         .forEach(partition -> this.nextOffset[partition] = -1l);
 
     this.instanceUri = instanceUri.toASCIIString();
+
+    this.channelMediator = channelMediator;
   }
 
 
@@ -282,6 +286,7 @@ public class InfoChannel implements Runnable
           chatRoomId);
 
       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+      this.channelMediator.chatRoomCreated(chatRoomInfo);
     }
   }