From 02b377b89ddf99981b36b2455c76afd998043f0a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 21 Mar 2024 00:55:11 +0100 Subject: [PATCH] feat: Switched the config to `HaproxyDataPlaneApiShardingPublisherStrategy` --- .../kafka/chat/backend/ChatBackendProperties.java | 3 +++ .../kafka/KafkaServicesConfiguration.java | 14 ++++++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 1d115182..031ae8bd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -48,6 +48,9 @@ public class ChatBackendProperties private int numPartitions = 2; private Duration pollingInterval = Duration.ofSeconds(1); private String haproxyRuntimeApi = "haproxy:8401"; + private String haproxyDataPlaneApi = "http://haproxy:5555/v2/"; + private String haproxyUser = "juplo"; + private String haproxyPassword = "juplo"; private String haproxyMap = "sharding"; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index d9e85b40..7917390c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyRuntimeApiShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; @@ -21,6 +22,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.web.reactive.function.client.WebClient; import java.net.InetSocketAddress; import java.time.Clock; @@ -297,10 +299,14 @@ public class KafkaServicesConfiguration ShardingPublisherStrategy shardingPublisherStrategy( ChatBackendProperties properties) { - String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":"); - InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); - return new HaproxyRuntimeApiShardingPublisherStrategy( - haproxyAddress, + return new HaproxyDataPlaneApiShardingPublisherStrategy( + WebClient + .builder() + .baseUrl(properties.getKafka().getHaproxyDataPlaneApi()) + .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth( + properties.getKafka().getHaproxyUser(), + properties.getKafka().getHaproxyPassword())) + .build(), properties.getKafka().getHaproxyMap(), properties.getInstanceId()); } -- 2.20.1