private String infoChannelTopic = "info_channel";
private String dataChannelTopic = "data_channel";
private int numPartitions = 2;
+ private String haproxyRuntimeApi = "haproxy:8401";
+ private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
}
public enum ServiceType { inmemory, kafka }
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
+{
+ private final SocketAddress haproxyAddress;
+ private final String map;
+ private final String instanceId;
+
+
+ @Override
+ public Mono<String> publishOwnership(int shard)
+ {
+ try
+ {
+ SocketChannel socketChannel = SocketChannel.open(haproxyAddress);
+ String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n";
+ byte[] commandBytes = command.getBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
+ socketChannel.write(buffer);
+ socketChannel.close();
+ return Mono.just(instanceId);
+ }
+ catch (IOException e)
+ {
+ return Mono.error(e);
+ }
+ }
+}
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import reactor.core.publisher.Mono;
+import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.ZoneId;
import java.util.HashMap;
}
@Bean
- ShardingPublisherStrategy shardingPublisherStrategy()
+ ShardingPublisherStrategy shardingPublisherStrategy(
+ ChatBackendProperties properties)
{
- return new ShardingPublisherStrategy() {
- @Override
- public Mono<String> publishOwnership(int shard)
- {
- return Mono.just(Integer.toString(shard));
- }
- };
+ String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
+ InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
+ return new HaproxyShardingPublisherStrategy(
+ haproxyAddress,
+ properties.getKafka().getHaproxyMap(),
+ properties.getInstanceId());
}
@Bean