From cc219a1c04db3f60a2840f2d1815c59985f0266d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 17 Sep 2023 11:36:48 +0200 Subject: [PATCH] WIP:haproxy --- .../chat/backend/ChatBackendProperties.java | 2 ++ .../kafka/KafkaServicesConfiguration.java | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index ec5c7f5f..de2dc0d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -18,6 +18,8 @@ public class ChatBackendProperties private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); private KafkaServicesProperties kafka = new KafkaServicesProperties(); + private URI haproxyRuntimeApiUri = URI.create("haproxy:8401"); + private String haproxyInstanceId = "DEV"; @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 77955168..4f646f60 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -20,6 +22,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.web.reactive.function.client.WebClient; import java.time.Clock; import java.time.ZoneId; @@ -135,7 +138,8 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( properties.getKafka().getDataChannelTopic(), @@ -145,7 +149,8 @@ public class KafkaServicesConfiguration properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), clock, - infoChannel); + infoChannel, + shardingPublisherStrategy); } @Bean @@ -277,6 +282,16 @@ public class KafkaServicesConfiguration return properties; } + @Bean + ShardingPublisherStrategy shardingPublisherStrategy( + ChatBackendProperties properties, + WebClient webClient) + { + return new HaproxyShardingPublisherStrategy( + properties.getHaproxyRuntimeApiUri(), + webClient); + } + @Bean ZoneId zoneId() { -- 2.20.1