fix: `ConsumerTaskRunner` waits until the data-loading is finished
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index 5bde07c..cafc775 100644 (file)
@@ -39,11 +39,13 @@ public class KafkaServicesConfiguration
   @Bean
   ConsumerTaskRunner consumerTaskRunner(
       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+      InfoChannel infoChannel)
   {
     return new ConsumerTaskRunner(
         infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor);
+        dataChannelConsumerTaskExecutor,
+        infoChannel);
   }
 
   @Bean
@@ -51,7 +53,7 @@ public class KafkaServicesConfiguration
       ThreadPoolTaskExecutor taskExecutor,
       InfoChannel infoChannel,
       Consumer<String, AbstractMessageTo> infoChannelConsumer,
-      ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor)
+      WorkAssignor infoChannelWorkAssignor)
   {
     return new ConsumerTaskExecutor(
         taskExecutor,
@@ -61,8 +63,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
-      ChatBackendProperties properties)
+  WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
   {
     return consumer ->
     {
@@ -82,7 +83,7 @@ public class KafkaServicesConfiguration
       ThreadPoolTaskExecutor taskExecutor,
       DataChannel dataChannel,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
-      ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor)
+      WorkAssignor dataChannelWorkAssignor)
   {
     return new ConsumerTaskExecutor(
         taskExecutor,
@@ -92,7 +93,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
+  WorkAssignor dataChannelWorkAssignor(
       ChatBackendProperties properties,
       DataChannel dataChannel)
   {
@@ -125,7 +126,8 @@ public class KafkaServicesConfiguration
     return new InfoChannel(
         properties.getKafka().getInfoChannelTopic(),
         producer,
-        infoChannelConsumer);
+        infoChannelConsumer,
+        properties.getKafka().getInstanceUri());
   }
 
   @Bean
@@ -134,7 +136,8 @@ public class KafkaServicesConfiguration
       Producer<String, AbstractMessageTo> producer,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
-      Clock clock)
+      Clock clock,
+      InfoChannel infoChannel)
   {
     return new DataChannel(
         properties.getKafka().getDataChannelTopic(),
@@ -143,7 +146,8 @@ public class KafkaServicesConfiguration
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
-        clock);
+        clock,
+        infoChannel);
   }
 
   @Bean