fix: Fixed `ConcurrentModificationException` when accessing a chat-room
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index f78beb1..3337127 100644 (file)
@@ -133,7 +133,8 @@ public class KafkaServicesConfiguration
         infoChannelConsumer,
         properties.getKafka().getPollingInterval(),
         properties.getKafka().getNumPartitions(),
-        properties.getKafka().getInstanceUri());
+        properties.getKafka().getInstanceUri(),
+        channelMediator);
     channelMediator.setInfoChannel(infoChannel);
     return infoChannel;
   }
@@ -148,7 +149,7 @@ public class KafkaServicesConfiguration
       ChannelMediator channelMediator,
       ShardingPublisherStrategy shardingPublisherStrategy)
   {
-    return new DataChannel(
+    DataChannel dataChannel = new DataChannel(
         properties.getInstanceId(),
         properties.getKafka().getDataChannelTopic(),
         producer,
@@ -160,6 +161,8 @@ public class KafkaServicesConfiguration
         clock,
         channelMediator,
         shardingPublisherStrategy);
+    channelMediator.setDataChannel(dataChannel);
+    return dataChannel;
   }
 
   @Bean