From: Kai Moritz Date: Sun, 17 Mar 2024 21:00:08 +0000 (+0100) Subject: refactor: Renamed `ShardingStrategy` for HAProxy Runtime API -- MOVE X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=dcfa67d58e160a3b0e17a93b31eb095f6e017b93;p=demos%2Fkafka%2Fchat refactor: Renamed `ShardingStrategy` for HAProxy Runtime API -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyRuntimeApiShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyRuntimeApiShardingPublisherStrategy.java new file mode 100644 index 00000000..ad71d497 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyRuntimeApiShardingPublisherStrategy.java @@ -0,0 +1,40 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + + +@RequiredArgsConstructor +@Slf4j +public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy +{ + private final SocketAddress haproxyAddress; + private final String map; + private final String instanceId; + + + @Override + public Mono publishOwnership(int shard) + { + try + { + SocketChannel socketChannel = SocketChannel.open(haproxyAddress); + String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n"; + byte[] commandBytes = command.getBytes(); + ByteBuffer buffer = ByteBuffer.wrap(commandBytes); + socketChannel.write(buffer); + socketChannel.close(); + return Mono.just(instanceId); + } + catch (Exception e) + { + return Mono.error(e); + } + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java deleted file mode 100644 index ad71d497..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java +++ /dev/null @@ -1,40 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.haproxy; - -import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Mono; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - - -@RequiredArgsConstructor -@Slf4j -public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy -{ - private final SocketAddress haproxyAddress; - private final String map; - private final String instanceId; - - - @Override - public Mono publishOwnership(int shard) - { - try - { - SocketChannel socketChannel = SocketChannel.open(haproxyAddress); - String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n"; - byte[] commandBytes = command.getBytes(); - ByteBuffer buffer = ByteBuffer.wrap(commandBytes); - socketChannel.write(buffer); - socketChannel.close(); - return Mono.just(instanceId); - } - catch (Exception e) - { - return Mono.error(e); - } - } -}