WIP:haproxy
authorKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:36:48 +0000 (11:36 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:36:48 +0000 (11:36 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index ec5c7f5..de2dc0d 100644 (file)
@@ -18,6 +18,8 @@ public class ChatBackendProperties
   private ServiceType services = ServiceType.inmemory;
   private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
   private KafkaServicesProperties kafka = new KafkaServicesProperties();
+  private URI haproxyRuntimeApiUri = URI.create("haproxy:8401");
+  private String haproxyInstanceId = "DEV";
 
 
   @Getter
index 7795516..4f646f6 100644 (file)
@@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
@@ -20,6 +22,7 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.web.reactive.function.client.WebClient;
 
 import java.time.Clock;
 import java.time.ZoneId;
@@ -135,7 +138,8 @@ public class KafkaServicesConfiguration
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel)
+      InfoChannel infoChannel,
+      ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
         properties.getKafka().getDataChannelTopic(),
@@ -145,7 +149,8 @@ public class KafkaServicesConfiguration
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
         clock,
-        infoChannel);
+        infoChannel,
+        shardingPublisherStrategy);
   }
 
   @Bean
@@ -277,6 +282,16 @@ public class KafkaServicesConfiguration
     return properties;
   }
 
+  @Bean
+  ShardingPublisherStrategy shardingPublisherStrategy(
+      ChatBackendProperties properties,
+      WebClient webClient)
+  {
+    return new HaproxyShardingPublisherStrategy(
+        properties.getHaproxyRuntimeApiUri(),
+        webClient);
+  }
+
   @Bean
   ZoneId zoneId()
   {