X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=54aa41f1520efa44cd3bc0a6cf376219a9dfdaba;hb=31a32e75e4f1e4513a14f048325c0a5b52f08519;hp=5a41ebcc65c41575371bee7a13f4c887574d064a;hpb=e2933c8acd7ffd1c3a8530b0d34621c183ef09fa;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 5a41ebcc..54aa41f1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; @@ -40,25 +39,29 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ConsumerTaskRunner consumerTaskRunner( - ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor, - InfoChannel infoChannel) + KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer() { - return new ConsumerTaskRunner( - infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor, - infoChannel); + return new KafkaServicesThreadPoolTaskExecutorCustomizer(); } @Bean - ConsumerTaskExecutor infoChannelConsumerTaskExecutor( + ChannelTaskRunner channelTaskRunner( + ChannelTaskExecutor infoChannelTaskExecutor, + ChannelTaskExecutor dataChannelTaskExecutor) + { + return new ChannelTaskRunner( + infoChannelTaskExecutor, + dataChannelTaskExecutor); + } + + @Bean + ChannelTaskExecutor infoChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer infoChannelConsumer, WorkAssignor infoChannelWorkAssignor) { - return new ConsumerTaskExecutor( + return new ChannelTaskExecutor( taskExecutor, infoChannel, infoChannelConsumer, @@ -82,13 +85,13 @@ public class KafkaServicesConfiguration } @Bean - ConsumerTaskExecutor dataChannelConsumerTaskExecutor( + ChannelTaskExecutor dataChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer, WorkAssignor dataChannelWorkAssignor) { - return new ConsumerTaskExecutor( + return new ChannelTaskExecutor( taskExecutor, dataChannel, dataChannelConsumer, @@ -124,14 +127,19 @@ public class KafkaServicesConfiguration InfoChannel infoChannel( ChatBackendProperties properties, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + ChannelMediator channelMediator) { - return new InfoChannel( + InfoChannel infoChannel = new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, infoChannelConsumer, + properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), - properties.getKafka().getInstanceUri()); + properties.getKafka().getInstanceUri(), + channelMediator); + channelMediator.setInfoChannel(infoChannel); + return infoChannel; } @Bean @@ -141,20 +149,29 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel, + ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { - return new DataChannel( + DataChannel dataChannel = new DataChannel( properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), + properties.getKafka().getPollingInterval(), properties.getChatroomBufferSize(), clock, - infoChannel, + channelMediator, shardingPublisherStrategy); + channelMediator.setDataChannel(dataChannel); + return dataChannel; + } + + @Bean + ChannelMediator channelMediator() + { + return new ChannelMediator(); } @Bean @@ -303,4 +320,17 @@ public class KafkaServicesConfiguration { return ZoneId.systemDefault(); } + + @Bean + ChannelReactiveHealthIndicator dataChannelHealthIndicator( + DataChannel dataChannel) + { + return new ChannelReactiveHealthIndicator(dataChannel); + } + + @Bean + ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel) + { + return new ChannelReactiveHealthIndicator(infoChannel); + } }