refactor: Removed config-dependencies from `ChannelTaskExecutor`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index c7cf113..b28b690 100644 (file)
@@ -38,6 +38,12 @@ import java.util.Properties;
 @Configuration
 public class KafkaServicesConfiguration
 {
+  @Bean
+  KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
+  {
+    return new KafkaServicesThreadPoolTaskExecutorCustomizer();
+  }
+
   @Bean
   ChannelTaskRunner channelTaskRunner(
       ChannelTaskExecutor infoChannelTaskExecutor,
@@ -48,7 +54,7 @@ public class KafkaServicesConfiguration
         dataChannelTaskExecutor);
   }
 
-  @Bean
+  @Bean(destroyMethod = "join")
   ChannelTaskExecutor infoChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       InfoChannel infoChannel,
@@ -78,7 +84,7 @@ public class KafkaServicesConfiguration
     };
   }
 
-  @Bean
+  @Bean(destroyMethod = "join")
   ChannelTaskExecutor dataChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       DataChannel dataChannel,
@@ -314,4 +320,17 @@ public class KafkaServicesConfiguration
   {
     return ZoneId.systemDefault();
   }
+
+  @Bean
+  ChannelReactiveHealthIndicator dataChannelHealthIndicator(
+      DataChannel dataChannel)
+  {
+    return new ChannelReactiveHealthIndicator(dataChannel);
+  }
+
+  @Bean
+  ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
+  {
+    return new ChannelReactiveHealthIndicator(infoChannel);
+  }
 }