WIP:haproxy
authorKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 10:39:08 +0000 (12:39 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 10:39:08 +0000 (12:39 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index 8f151e2..9ace559 100644 (file)
@@ -19,6 +19,7 @@ public class ChatBackendProperties
   private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
   private KafkaServicesProperties kafka = new KafkaServicesProperties();
   private String haproxyRuntimeApi = "haproxy:8401";
+  private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
   private String haproxyInstanceId = "DEV";
 
 
index e22d75f..3caaeb3 100644 (file)
@@ -5,19 +5,37 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Mono;
 
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
 
 @RequiredArgsConstructor
 @Slf4j
 public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
 {
-  private final String host;
-  private final int port;
+  private final SocketAddress haproxyAddress;
+  private final String map;
   private final String instanceId;
 
 
   @Override
   public Mono<String> publishOwnership(int shard)
   {
-    return Mono.error(new RuntimeException("TODO"));
+    try
+    {
+      SocketChannel socketChannel = SocketChannel.open(haproxyAddress);
+      String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n";
+      byte[] commandBytes = command.getBytes();
+      ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
+      socketChannel.write(buffer);
+      socketChannel.close();
+      return Mono.just(instanceId);
+    }
+    catch (IOException e)
+    {
+      return Mono.error(e);
+    }
   }
 }
index c6822d6..2d0f4c8 100644 (file)
@@ -24,6 +24,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.web.reactive.function.client.WebClient;
 
+import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
@@ -287,9 +288,10 @@ public class KafkaServicesConfiguration
       ChatBackendProperties properties)
   {
     String[] parts = properties.getHaproxyRuntimeApi().split(":");
+    InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
     return new HaproxyShardingPublisherStrategy(
-        parts[0],
-        Integer.valueOf(parts[1]),
+        haproxyAddress,
+        properties.getHaproxyMap(),
         properties.getHaproxyInstanceId());
   }