From: Kai Moritz Date: Sun, 17 Sep 2023 09:31:22 +0000 (+0200) Subject: feat: Implemented `HaproxyShardingPublisherStrategy` X-Git-Tag: rebase--2024-02-20--10-29~27 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=374aa894409a51671f5c5b8d7c1c478b5554448d;p=demos%2Fkafka%2Fchat feat: Implemented `HaproxyShardingPublisherStrategy` * Implemented a first simple `ShardingPublisherStrategy`, that uses the https://www.haproxy.com/documentation/haproxy-runtime-api/[HAProxy Runntime API] to publish changed ownerships. * Added configuration-properties `kafka.haproxyRuntimeApi` and `kafka.haproxyMap` to configure the strategy. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index c0b4934e..9a73f6f1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -43,6 +43,8 @@ public class ChatBackendProperties private String infoChannelTopic = "info_channel"; private String dataChannelTopic = "data_channel"; private int numPartitions = 2; + private String haproxyRuntimeApi = "haproxy:8401"; + private String haproxyMap = "/usr/local/etc/haproxy/sharding.map"; } public enum ServiceType { inmemory, kafka } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java new file mode 100644 index 00000000..3caaeb38 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -0,0 +1,41 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + + +@RequiredArgsConstructor +@Slf4j +public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy +{ + private final SocketAddress haproxyAddress; + private final String map; + private final String instanceId; + + + @Override + public Mono publishOwnership(int shard) + { + try + { + SocketChannel socketChannel = SocketChannel.open(haproxyAddress); + String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n"; + byte[] commandBytes = command.getBytes(); + ByteBuffer buffer = ByteBuffer.wrap(commandBytes); + socketChannel.write(buffer); + socketChannel.close(); + return Mono.just(instanceId); + } + catch (IOException e) + { + return Mono.error(e); + } + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index c3027fa9..f8bebd6d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -21,8 +22,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import reactor.core.publisher.Mono; +import java.net.InetSocketAddress; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; @@ -285,15 +286,15 @@ public class KafkaServicesConfiguration } @Bean - ShardingPublisherStrategy shardingPublisherStrategy() + ShardingPublisherStrategy shardingPublisherStrategy( + ChatBackendProperties properties) { - return new ShardingPublisherStrategy() { - @Override - public Mono publishOwnership(int shard) - { - return Mono.just(Integer.toString(shard)); - } - }; + String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":"); + InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); + return new HaproxyShardingPublisherStrategy( + haproxyAddress, + properties.getKafka().getHaproxyMap(), + properties.getInstanceId()); } @Bean