X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=58e111734d40b8c20a7bd39ff710951ece8a4353;hb=900422dccb5a92fbceac34caa5e614b0d7f05ad7;hp=c7cf113a11d89494a2c0b448db95d8a68adaf610;hpb=5168b2e2a25b5890fe39ea5fba16688f81b3d3a2;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index c7cf113a..58e11173 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -39,16 +39,12 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ChannelTaskRunner channelTaskRunner( - ChannelTaskExecutor infoChannelTaskExecutor, - ChannelTaskExecutor dataChannelTaskExecutor) + KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer() { - return new ChannelTaskRunner( - infoChannelTaskExecutor, - dataChannelTaskExecutor); + return new KafkaServicesThreadPoolTaskExecutorCustomizer(); } - @Bean + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") ChannelTaskExecutor infoChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, @@ -78,7 +74,7 @@ public class KafkaServicesConfiguration }; } - @Bean + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") ChannelTaskExecutor dataChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, @@ -154,7 +150,7 @@ public class KafkaServicesConfiguration zoneId, properties.getKafka().getNumPartitions(), properties.getKafka().getPollingInterval(), - properties.getChatroomBufferSize(), + properties.getChatroomHistoryLimit(), clock, channelMediator, shardingPublisherStrategy); @@ -314,4 +310,17 @@ public class KafkaServicesConfiguration { return ZoneId.systemDefault(); } + + @Bean + ChannelReactiveHealthIndicator dataChannelHealthIndicator( + DataChannel dataChannel) + { + return new ChannelReactiveHealthIndicator(dataChannel); + } + + @Bean + ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel) + { + return new ChannelReactiveHealthIndicator(infoChannel); + } }