WIP:haproxy
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index cafc775..54c4830 100644 (file)
@@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
@@ -21,6 +23,7 @@ import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
+import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
@@ -137,9 +140,11 @@ public class KafkaServicesConfiguration
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel)
+      InfoChannel infoChannel,
+      ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
+        properties.getInstanceId(),
         properties.getKafka().getDataChannelTopic(),
         producer,
         dataChannelConsumer,
@@ -147,7 +152,8 @@ public class KafkaServicesConfiguration
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
         clock,
-        infoChannel);
+        infoChannel,
+        shardingPublisherStrategy);
   }
 
   @Bean
@@ -279,6 +285,18 @@ public class KafkaServicesConfiguration
     return properties;
   }
 
+  @Bean
+  ShardingPublisherStrategy shardingPublisherStrategy(
+      ChatBackendProperties properties)
+  {
+    String[] parts = properties.getHaproxyRuntimeApi().split(":");
+    InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
+    return new HaproxyShardingPublisherStrategy(
+        haproxyAddress,
+        properties.getHaproxyMap(),
+        properties.getInstanceId());
+  }
+
   @Bean
   ZoneId zoneId()
   {