private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
private KafkaServicesProperties kafka = new KafkaServicesProperties();
private String haproxyRuntimeApi = "haproxy:8401";
+ private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
private String haproxyInstanceId = "DEV";
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
@RequiredArgsConstructor
@Slf4j
public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
{
- private final String host;
- private final int port;
+ private final SocketAddress haproxyAddress;
+ private final String map;
private final String instanceId;
@Override
public Mono<String> publishOwnership(int shard)
{
- return Mono.error(new RuntimeException("TODO"));
+ try
+ {
+ SocketChannel socketChannel = SocketChannel.open(haproxyAddress);
+ String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n";
+ byte[] commandBytes = command.getBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
+ socketChannel.write(buffer);
+ socketChannel.close();
+ return Mono.just(instanceId);
+ }
+ catch (IOException e)
+ {
+ return Mono.error(e);
+ }
}
}
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.reactive.function.client.WebClient;
+import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.ZoneId;
import java.util.HashMap;
ChatBackendProperties properties)
{
String[] parts = properties.getHaproxyRuntimeApi().split(":");
+ InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
return new HaproxyShardingPublisherStrategy(
- parts[0],
- Integer.valueOf(parts[1]),
+ haproxyAddress,
+ properties.getHaproxyMap(),
properties.getHaproxyInstanceId());
}