TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index c7cf113..58e1117 100644 (file)
@@ -39,16 +39,12 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ChannelTaskRunner channelTaskRunner(
-      ChannelTaskExecutor infoChannelTaskExecutor,
-      ChannelTaskExecutor dataChannelTaskExecutor)
+  KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
   {
-    return new ChannelTaskRunner(
-        infoChannelTaskExecutor,
-        dataChannelTaskExecutor);
+    return new KafkaServicesThreadPoolTaskExecutorCustomizer();
   }
 
-  @Bean
+  @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
   ChannelTaskExecutor infoChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       InfoChannel infoChannel,
@@ -78,7 +74,7 @@ public class KafkaServicesConfiguration
     };
   }
 
-  @Bean
+  @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
   ChannelTaskExecutor dataChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       DataChannel dataChannel,
@@ -154,7 +150,7 @@ public class KafkaServicesConfiguration
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getKafka().getPollingInterval(),
-        properties.getChatroomBufferSize(),
+        properties.getChatroomHistoryLimit(),
         clock,
         channelMediator,
         shardingPublisherStrategy);
@@ -314,4 +310,17 @@ public class KafkaServicesConfiguration
   {
     return ZoneId.systemDefault();
   }
+
+  @Bean
+  ChannelReactiveHealthIndicator dataChannelHealthIndicator(
+      DataChannel dataChannel)
+  {
+    return new ChannelReactiveHealthIndicator(dataChannel);
+  }
+
+  @Bean
+  ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
+  {
+    return new ChannelReactiveHealthIndicator(infoChannel);
+  }
 }