refactor: Introduced `ChannelMediator`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index f8bebd6..f78beb1 100644 (file)
@@ -109,7 +109,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-    ChatHomeService kafkaChatHome(
+  KafkaChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
       InfoChannel infoChannel,
       DataChannel dataChannel)
@@ -124,13 +124,18 @@ public class KafkaServicesConfiguration
   InfoChannel infoChannel(
       ChatBackendProperties properties,
       Producer<String, AbstractMessageTo> producer,
-      Consumer<String, AbstractMessageTo> infoChannelConsumer)
+      Consumer<String, AbstractMessageTo> infoChannelConsumer,
+      ChannelMediator channelMediator)
   {
-    return new InfoChannel(
+    InfoChannel infoChannel = new InfoChannel(
         properties.getKafka().getInfoChannelTopic(),
         producer,
         infoChannelConsumer,
+        properties.getKafka().getPollingInterval(),
+        properties.getKafka().getNumPartitions(),
         properties.getKafka().getInstanceUri());
+    channelMediator.setInfoChannel(infoChannel);
+    return infoChannel;
   }
 
   @Bean
@@ -140,7 +145,7 @@ public class KafkaServicesConfiguration
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel,
+      ChannelMediator channelMediator,
       ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
@@ -150,12 +155,19 @@ public class KafkaServicesConfiguration
         dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
+        properties.getKafka().getPollingInterval(),
         properties.getChatroomBufferSize(),
         clock,
-        infoChannel,
+        channelMediator,
         shardingPublisherStrategy);
   }
 
+  @Bean
+  ChannelMediator channelMediator()
+  {
+    return new ChannelMediator();
+  }
+
   @Bean
   Producer<String, AbstractMessageTo>  producer(
       Properties defaultProducerProperties,