X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=54c48301588b9a970cca5d5da574bf4a7edfaef6;hb=f70bb25f9b0769068028348995a78ece2130cd35;hp=784ffa544d65cc4025a2f7ddae62ad71e4054eaf;hpb=9b0879d7ef3a4811ef48fb9190558937a07f1194;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 784ffa54..54c48301 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -21,6 +23,7 @@ import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import java.net.InetSocketAddress; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; @@ -39,11 +42,13 @@ public class KafkaServicesConfiguration @Bean ConsumerTaskRunner consumerTaskRunner( ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor) + ConsumerTaskExecutor dataChannelConsumerTaskExecutor, + InfoChannel infoChannel) { return new ConsumerTaskRunner( infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor); + dataChannelConsumerTaskExecutor, + infoChannel); } @Bean @@ -124,7 +129,8 @@ public class KafkaServicesConfiguration return new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, - infoChannelConsumer); + infoChannelConsumer, + properties.getKafka().getInstanceUri()); } @Bean @@ -134,9 +140,11 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( + properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, dataChannelConsumer, @@ -144,7 +152,8 @@ public class KafkaServicesConfiguration properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), clock, - infoChannel); + infoChannel, + shardingPublisherStrategy); } @Bean @@ -276,6 +285,18 @@ public class KafkaServicesConfiguration return properties; } + @Bean + ShardingPublisherStrategy shardingPublisherStrategy( + ChatBackendProperties properties) + { + String[] parts = properties.getHaproxyRuntimeApi().split(":"); + InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); + return new HaproxyShardingPublisherStrategy( + haproxyAddress, + properties.getHaproxyMap(), + properties.getInstanceId()); + } + @Bean ZoneId zoneId() {