X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=33371279b8c9de92b2ffa06615d128eed74491a5;hb=eaec0e92a1887c6b1c0059de1b5db44039dc1dd4;hp=b5bac4700ca8d56f1377febf653074a02343934c;hpb=5893c8aa8ca4b3cb4d85dd11daa48ddc0acfd759;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index b5bac470..33371279 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -21,6 +23,7 @@ import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import java.net.InetSocketAddress; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; @@ -39,11 +42,13 @@ public class KafkaServicesConfiguration @Bean ConsumerTaskRunner consumerTaskRunner( ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor) + ConsumerTaskExecutor dataChannelConsumerTaskExecutor, + InfoChannel infoChannel) { return new ConsumerTaskRunner( infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor); + dataChannelConsumerTaskExecutor, + infoChannel); } @Bean @@ -104,7 +109,7 @@ public class KafkaServicesConfiguration } @Bean - ChatHomeService kafkaChatHome( + KafkaChatHomeService kafkaChatHome( ChatBackendProperties properties, InfoChannel infoChannel, DataChannel dataChannel) @@ -119,12 +124,19 @@ public class KafkaServicesConfiguration InfoChannel infoChannel( ChatBackendProperties properties, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + ChannelMediator channelMediator) { - return new InfoChannel( + InfoChannel infoChannel = new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, - infoChannelConsumer); + infoChannelConsumer, + properties.getKafka().getPollingInterval(), + properties.getKafka().getNumPartitions(), + properties.getKafka().getInstanceUri(), + channelMediator); + channelMediator.setInfoChannel(infoChannel); + return infoChannel; } @Bean @@ -133,16 +145,30 @@ public class KafkaServicesConfiguration Producer producer, Consumer dataChannelConsumer, ZoneId zoneId, - Clock clock) + Clock clock, + ChannelMediator channelMediator, + ShardingPublisherStrategy shardingPublisherStrategy) { - return new DataChannel( + DataChannel dataChannel = new DataChannel( + properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), + properties.getKafka().getPollingInterval(), properties.getChatroomBufferSize(), - clock); + clock, + channelMediator, + shardingPublisherStrategy); + channelMediator.setDataChannel(dataChannel); + return dataChannel; + } + + @Bean + ChannelMediator channelMediator() + { + return new ChannelMediator(); } @Bean @@ -274,6 +300,18 @@ public class KafkaServicesConfiguration return properties; } + @Bean + ShardingPublisherStrategy shardingPublisherStrategy( + ChatBackendProperties properties) + { + String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":"); + InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); + return new HaproxyShardingPublisherStrategy( + haproxyAddress, + properties.getKafka().getHaproxyMap(), + properties.getInstanceId()); + } + @Bean ZoneId zoneId() {