From: Kai Moritz Date: Wed, 20 Mar 2024 23:55:11 +0000 (+0100) Subject: feat: Switched the config to `HaproxyDataPlaneApiShardingPublisherStrategy` X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=02b377b89ddf99981b36b2455c76afd998043f0a;p=demos%2Fkafka%2Fchat feat: Switched the config to `HaproxyDataPlaneApiShardingPublisherStrategy` --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 1d115182..031ae8bd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -48,6 +48,9 @@ public class ChatBackendProperties private int numPartitions = 2; private Duration pollingInterval = Duration.ofSeconds(1); private String haproxyRuntimeApi = "haproxy:8401"; + private String haproxyDataPlaneApi = "http://haproxy:5555/v2/"; + private String haproxyUser = "juplo"; + private String haproxyPassword = "juplo"; private String haproxyMap = "sharding"; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index d9e85b40..7917390c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyRuntimeApiShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; @@ -21,6 +22,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.web.reactive.function.client.WebClient; import java.net.InetSocketAddress; import java.time.Clock; @@ -297,10 +299,14 @@ public class KafkaServicesConfiguration ShardingPublisherStrategy shardingPublisherStrategy( ChatBackendProperties properties) { - String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":"); - InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); - return new HaproxyRuntimeApiShardingPublisherStrategy( - haproxyAddress, + return new HaproxyDataPlaneApiShardingPublisherStrategy( + WebClient + .builder() + .baseUrl(properties.getKafka().getHaproxyDataPlaneApi()) + .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth( + properties.getKafka().getHaproxyUser(), + properties.getKafka().getHaproxyPassword())) + .build(), properties.getKafka().getHaproxyMap(), properties.getInstanceId()); }