3caaeb383f225276d302f1b295dd9f2205fa738f
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.implementation.haproxy;
2
3 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Mono;
7
8 import java.io.IOException;
9 import java.net.SocketAddress;
10 import java.nio.ByteBuffer;
11 import java.nio.channels.SocketChannel;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
17 {
18   private final SocketAddress haproxyAddress;
19   private final String map;
20   private final String instanceId;
21
22
23   @Override
24   public Mono<String> publishOwnership(int shard)
25   {
26     try
27     {
28       SocketChannel socketChannel = SocketChannel.open(haproxyAddress);
29       String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n";
30       byte[] commandBytes = command.getBytes();
31       ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
32       socketChannel.write(buffer);
33       socketChannel.close();
34       return Mono.just(instanceId);
35     }
36     catch (IOException e)
37     {
38       return Mono.error(e);
39     }
40   }
41 }