@Setter
public class ChatBackendProperties
{
+ private String instanceId = "DEV";
private String allowedOrigins = "http://localhost:4200";
private int chatroomBufferSize = 8;
private ServiceType services = ServiceType.inmemory;
private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
private KafkaServicesProperties kafka = new KafkaServicesProperties();
+ private String haproxyRuntimeApi = "haproxy:8401";
+ private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
@Getter
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Mono;
+
+
+public interface ShardingPublisherStrategy
+{
+ Mono<String> publishOwnership(int shard);
+}
public class ShardNotOwnedException extends IllegalStateException
{
+ @Getter
+ private final String instanceId;
@Getter
private final int shard;
- public ShardNotOwnedException(int shard)
+ public ShardNotOwnedException(String instanceId, int shard)
{
- super("This instance does not own the shard " + shard);
+ super("Instance " + instanceId + " does not own the shard " + shard);
+ this.instanceId = instanceId;
this.shard = shard;
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
+{
+ private final SocketAddress haproxyAddress;
+ private final String map;
+ private final String instanceId;
+
+
+ @Override
+ public Mono<String> publishOwnership(int shard)
+ {
+ try
+ {
+ SocketChannel socketChannel = SocketChannel.open(haproxyAddress);
+ String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n";
+ byte[] commandBytes = command.getBytes();
+ ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
+ socketChannel.write(buffer);
+ socketChannel.close();
+ return Mono.just(instanceId);
+ }
+ catch (IOException e)
+ {
+ return Mono.error(e);
+ }
+ }
+}
properties.getChatroomBufferSize()));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
return new ShardedChatHomeService(
+ properties.getInstanceId(),
chatHomes,
properties.getInmemory().getShardOwners(),
strategy);
@Slf4j
public class ShardedChatHomeService implements ChatHomeService
{
+ private final String instanceId;
private final SimpleChatHomeService[] chatHomes;
private final Set<Integer> ownedShards;
private final String[] shardOwners;
public ShardedChatHomeService(
+ String instanceId,
SimpleChatHomeService[] chatHomes,
URI[] shardOwners,
ShardingStrategy shardingStrategy)
{
+ this.instanceId = instanceId;
this.chatHomes = chatHomes;
this.shardOwners = Arrays
.stream(shardOwners)
{
int shard = shardingStrategy.selectShard(id);
return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
+ ? Mono.error(new ShardNotOwnedException(instanceId, shard))
: chatHomes[shard].createChatRoom(id, name);
}
{
int shard = selectShard(id);
return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
+ ? Mono.error(new ShardNotOwnedException(instanceId, shard))
: chatHomes[shard]
.getChatRoomInfo(id)
.onErrorMap(throwable -> throwable instanceof UnknownChatroomException
{
int shard = selectShard(id);
return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
+ ? Mono.error(new ShardNotOwnedException(instanceId, shard))
: chatHomes[shard]
.getChatRoomData(id)
.onErrorMap(throwable -> throwable instanceof UnknownChatroomException
import java.time.*;
import java.util.*;
-import java.util.function.Function;
import java.util.stream.IntStream;
@Slf4j
public class DataChannel implements Runnable, ConsumerRebalanceListener
{
+ private final String instanceId;
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final InfoChannel infoChannel;
+ private final ShardingPublisherStrategy shardingPublisherStrategy;
private boolean running;
@Getter
public DataChannel(
+ String instanceId,
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
int numShards,
int bufferSize,
Clock clock,
- InfoChannel infoChannel)
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
log.debug(
- "Creating DataChannel for topic {} with {} partitions",
+ "{}: Creating DataChannel for topic {} with {} partitions",
+ instanceId,
topic,
numShards);
+ this.instanceId = instanceId;
this.topic = topic;
this.consumer = dataChannelConsumer;
this.producer = producer;
.range(0, numShards)
.forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
this.infoChannel = infoChannel;
+ this.shardingPublisherStrategy = shardingPublisherStrategy;
}
consumer.seek(topicPartition, nextOffset[partition]);
infoChannel.sendShardAssignedEvent(partition);
+ shardingPublisherStrategy
+ .publishOwnership(partition)
+ .doOnNext(instanceId -> log.info(
+ "Instance {} was published as owner of shard {}",
+ instanceId,
+ partition))
+ .subscribe();
});
consumer.resume(partitions);
if (!isShardOwned[shard])
{
- return Mono.error(new ShardNotOwnedException(shard));
+ return Mono.error(new ShardNotOwnedException(instanceId, shard));
}
return infoChannel
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.ZoneId;
import java.util.HashMap;
Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
Clock clock,
- InfoChannel infoChannel)
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
return new DataChannel(
+ properties.getInstanceId(),
properties.getKafka().getDataChannelTopic(),
producer,
dataChannelConsumer,
properties.getKafka().getNumPartitions(),
properties.getChatroomBufferSize(),
clock,
- infoChannel);
+ infoChannel,
+ shardingPublisherStrategy);
}
@Bean
return properties;
}
+ @Bean
+ ShardingPublisherStrategy shardingPublisherStrategy(
+ ChatBackendProperties properties)
+ {
+ String[] parts = properties.getHaproxyRuntimeApi().split(":");
+ InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
+ return new HaproxyShardingPublisherStrategy(
+ haproxyAddress,
+ properties.getHaproxyMap(),
+ properties.getInstanceId());
+ }
+
@Bean
ZoneId zoneId()
{
{
// Given
UUID chatroomId = UUID.randomUUID();
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
return new ShardedChatHomeService(
+ "http://instance-0",
chatHomes,
IntStream
.range(0, NUM_SHARDS)
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
+import reactor.core.publisher.Mono;
import java.time.Clock;
import java.util.List;
@Import(KafkaServicesConfiguration.class)
public static class KafkaTestConfiguration
{
+ @Bean
+ public ShardingPublisherStrategy shardingPublisherStrategy()
+ {
+ return shard -> Mono.just("MOCKED!");
+ }
+
@Bean
public WorkAssignor dataChannelWorkAssignor(
ChatBackendProperties properties,