feat: Switched the config to `HaproxyDataPlaneApiShardingPublisherStrategy`
authorKai Moritz <kai@juplo.de>
Wed, 20 Mar 2024 23:55:11 +0000 (00:55 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 16:39:20 +0000 (17:39 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index 1d11518..031ae8b 100644 (file)
@@ -48,6 +48,9 @@ public class ChatBackendProperties
     private int numPartitions = 2;
     private Duration pollingInterval = Duration.ofSeconds(1);
     private String haproxyRuntimeApi = "haproxy:8401";
+    private String haproxyDataPlaneApi = "http://haproxy:5555/v2/";
+    private String haproxyUser = "juplo";
+    private String haproxyPassword = "juplo";
     private String haproxyMap = "sharding";
   }
 
index d9e85b4..7917390 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyRuntimeApiShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
@@ -21,6 +22,7 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.web.reactive.function.client.WebClient;
 
 import java.net.InetSocketAddress;
 import java.time.Clock;
@@ -297,10 +299,14 @@ public class KafkaServicesConfiguration
   ShardingPublisherStrategy shardingPublisherStrategy(
       ChatBackendProperties properties)
   {
-    String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
-    InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
-    return new HaproxyRuntimeApiShardingPublisherStrategy(
-        haproxyAddress,
+    return new HaproxyDataPlaneApiShardingPublisherStrategy(
+        WebClient
+            .builder()
+            .baseUrl(properties.getKafka().getHaproxyDataPlaneApi())
+            .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(
+                properties.getKafka().getHaproxyUser(),
+                properties.getKafka().getHaproxyPassword()))
+            .build(),
         properties.getKafka().getHaproxyMap(),
         properties.getInstanceId());
   }