From 22af46396a941b8602d80310c3544c061da58415 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 17 Sep 2023 12:39:08 +0200 Subject: [PATCH] WIP:haproxy --- .../chat/backend/ChatBackendProperties.java | 1 + .../HaproxyShardingPublisherStrategy.java | 24 ++++++++++++++++--- .../kafka/KafkaServicesConfiguration.java | 6 +++-- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 8f151e2c..9ace559d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -19,6 +19,7 @@ public class ChatBackendProperties private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); private KafkaServicesProperties kafka = new KafkaServicesProperties(); private String haproxyRuntimeApi = "haproxy:8401"; + private String haproxyMap = "/usr/local/etc/haproxy/sharding.map"; private String haproxyInstanceId = "DEV"; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java index e22d75f8..3caaeb38 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -5,19 +5,37 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + @RequiredArgsConstructor @Slf4j public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy { - private final String host; - private final int port; + private final SocketAddress haproxyAddress; + private final String map; private final String instanceId; @Override public Mono publishOwnership(int shard) { - return Mono.error(new RuntimeException("TODO")); + try + { + SocketChannel socketChannel = SocketChannel.open(haproxyAddress); + String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n"; + byte[] commandBytes = command.getBytes(); + ByteBuffer buffer = ByteBuffer.wrap(commandBytes); + socketChannel.write(buffer); + socketChannel.close(); + return Mono.just(instanceId); + } + catch (IOException e) + { + return Mono.error(e); + } } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index c6822d67..2d0f4c85 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -24,6 +24,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.reactive.function.client.WebClient; +import java.net.InetSocketAddress; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; @@ -287,9 +288,10 @@ public class KafkaServicesConfiguration ChatBackendProperties properties) { String[] parts = properties.getHaproxyRuntimeApi().split(":"); + InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); return new HaproxyShardingPublisherStrategy( - parts[0], - Integer.valueOf(parts[1]), + haproxyAddress, + properties.getHaproxyMap(), properties.getHaproxyInstanceId()); } -- 2.20.1