X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=f8bebd6dbe3d266348b1274885859e4f3cb6a6de;hb=74501b5661b0a760cf2b45aa055120b0d20ef2e2;hp=c3027fa973a668c68573a0f4fb5a3c56a0abdd8d;hpb=d53ef6cdaa5480e2f6a01499603dcb9f32350c1b;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index c3027fa9..f8bebd6d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -21,8 +22,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import reactor.core.publisher.Mono; +import java.net.InetSocketAddress; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; @@ -285,15 +286,15 @@ public class KafkaServicesConfiguration } @Bean - ShardingPublisherStrategy shardingPublisherStrategy() + ShardingPublisherStrategy shardingPublisherStrategy( + ChatBackendProperties properties) { - return new ShardingPublisherStrategy() { - @Override - public Mono publishOwnership(int shard) - { - return Mono.just(Integer.toString(shard)); - } - }; + String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":"); + InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); + return new HaproxyShardingPublisherStrategy( + haproxyAddress, + properties.getKafka().getHaproxyMap(), + properties.getInstanceId()); } @Bean