@Setter
public class ChatBackendProperties
{
+ private String instanceId = "DEV";
private String allowedOrigins = "http://localhost:4200";
private int chatroomBufferSize = 8;
private ServiceType services = ServiceType.inmemory;
private KafkaServicesProperties kafka = new KafkaServicesProperties();
private String haproxyRuntimeApi = "haproxy:8401";
private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
- private String haproxyInstanceId = "DEV";
@Getter
public class ShardNotOwnedException extends IllegalStateException
{
+ @Getter
+ private final String instanceId;
@Getter
private final int shard;
- public ShardNotOwnedException(int shard)
+ public ShardNotOwnedException(String instanceId, int shard)
{
- super("This instance does not own the shard " + shard);
+ super("Instance " + instanceId + " does not own the shard " + shard);
+ this.instanceId = instanceId;
this.shard = shard;
}
}
properties.getChatroomBufferSize()));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
return new ShardedChatHomeService(
+ properties.getInstanceId(),
chatHomes,
properties.getInmemory().getShardOwners(),
strategy);
@Slf4j
public class ShardedChatHomeService implements ChatHomeService
{
+ private final String instanceId;
private final SimpleChatHomeService[] chatHomes;
private final Set<Integer> ownedShards;
private final String[] shardOwners;
public ShardedChatHomeService(
+ String instanceId,
SimpleChatHomeService[] chatHomes,
URI[] shardOwners,
ShardingStrategy shardingStrategy)
{
+ this.instanceId = instanceId;
this.chatHomes = chatHomes;
this.shardOwners = Arrays
.stream(shardOwners)
{
int shard = shardingStrategy.selectShard(id);
return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
+ ? Mono.error(new ShardNotOwnedException(instanceId, shard))
: chatHomes[shard].createChatRoom(id, name);
}
{
int shard = selectShard(id);
return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
+ ? Mono.error(new ShardNotOwnedException(instanceId, shard))
: chatHomes[shard]
.getChatRoomInfo(id)
.onErrorMap(throwable -> throwable instanceof UnknownChatroomException
{
int shard = selectShard(id);
return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
+ ? Mono.error(new ShardNotOwnedException(instanceId, shard))
: chatHomes[shard]
.getChatRoomData(id)
.onErrorMap(throwable -> throwable instanceof UnknownChatroomException
import java.time.*;
import java.util.*;
-import java.util.function.Function;
import java.util.stream.IntStream;
@Slf4j
public class DataChannel implements Runnable, ConsumerRebalanceListener
{
+ private final String instanceId;
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
public DataChannel(
+ String instanceId,
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
ShardingPublisherStrategy shardingPublisherStrategy)
{
log.debug(
- "Creating DataChannel for topic {} with {} partitions",
+ "{}: Creating DataChannel for topic {} with {} partitions",
+ instanceId,
topic,
numShards);
+ this.instanceId = instanceId;
this.topic = topic;
this.consumer = dataChannelConsumer;
this.producer = producer;
if (!isShardOwned[shard])
{
- return Mono.error(new ShardNotOwnedException(shard));
+ return Mono.error(new ShardNotOwnedException(instanceId, shard));
}
return infoChannel
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.web.reactive.function.client.WebClient;
import java.net.InetSocketAddress;
import java.time.Clock;
ShardingPublisherStrategy shardingPublisherStrategy)
{
return new DataChannel(
+ properties.getInstanceId(),
properties.getKafka().getDataChannelTopic(),
producer,
dataChannelConsumer,
return new HaproxyShardingPublisherStrategy(
haproxyAddress,
properties.getHaproxyMap(),
- properties.getHaproxyInstanceId());
+ properties.getInstanceId());
}
@Bean
{
// Given
UUID chatroomId = UUID.randomUUID();
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
+ String instanceId = "peter";
int shard = 666;
- when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
// When
WebTestClient.ResponseSpec responseSpec = client
ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
return new ShardedChatHomeService(
+ "http://instance-0",
chatHomes,
IntStream
.range(0, NUM_SHARDS)