1 package de.juplo.kafka.chat.backend.implementation.haproxy;
3 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Mono;
8 import java.net.SocketAddress;
9 import java.nio.ByteBuffer;
10 import java.nio.channels.SocketChannel;
13 @RequiredArgsConstructor
15 public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
17 private final SocketAddress haproxyAddress;
18 private final String map;
19 private final String instanceId;
23 public Mono<String> publishOwnership(int shard)
27 SocketChannel socketChannel = SocketChannel.open(haproxyAddress);
28 String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n";
29 byte[] commandBytes = command.getBytes();
30 ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
31 socketChannel.write(buffer);
32 socketChannel.close();
33 return Mono.just(instanceId);