From ec456b2c00027e54a49f3d916e89c831b2589186 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 20 Feb 2024 09:48:03 +0100 Subject: [PATCH] feat: Introduced a configurable instance-id --- .../kafka/chat/backend/ChatBackendProperties.java | 1 + .../domain/exceptions/ShardNotOwnedException.java | 7 +++++-- .../inmemory/InMemoryServicesConfiguration.java | 1 + .../inmemory/ShardedChatHomeService.java | 9 ++++++--- .../backend/implementation/kafka/DataChannel.java | 9 ++++++--- .../kafka/KafkaServicesConfiguration.java | 1 + .../backend/api/ChatBackendControllerTest.java | 15 ++++++++++----- .../inmemory/ShardedChatHomeServiceTest.java | 1 + 8 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index ec5c7f5f..c0b4934e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -13,6 +13,7 @@ import java.nio.file.Paths; @Setter public class ChatBackendProperties { + private String instanceId = "DEV"; private String allowedOrigins = "http://localhost:4200"; private int chatroomBufferSize = 8; private ServiceType services = ServiceType.inmemory; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java index 25df317b..d5bc5ce4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java @@ -5,13 +5,16 @@ import lombok.Getter; public class ShardNotOwnedException extends IllegalStateException { + @Getter + private final String instanceId; @Getter private final int shard; - public ShardNotOwnedException(int shard) + public ShardNotOwnedException(String instanceId, int shard) { - super("This instance does not own the shard " + shard); + super("Instance " + instanceId + " does not own the shard " + shard); + this.instanceId = instanceId; this.shard = shard; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index bc6f103b..5b5785ea 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -58,6 +58,7 @@ public class InMemoryServicesConfiguration properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHomeService( + properties.getInstanceId(), chatHomes, properties.getInmemory().getShardOwners(), strategy); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java index c281d9e1..ab7f8d43 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; @Slf4j public class ShardedChatHomeService implements ChatHomeService { + private final String instanceId; private final SimpleChatHomeService[] chatHomes; private final Set ownedShards; private final String[] shardOwners; @@ -26,10 +27,12 @@ public class ShardedChatHomeService implements ChatHomeService public ShardedChatHomeService( + String instanceId, SimpleChatHomeService[] chatHomes, URI[] shardOwners, ShardingStrategy shardingStrategy) { + this.instanceId = instanceId; this.chatHomes = chatHomes; this.shardOwners = Arrays .stream(shardOwners) @@ -54,7 +57,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = shardingStrategy.selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard].createChatRoom(id, name); } @@ -63,7 +66,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard] .getChatRoomInfo(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException @@ -87,7 +90,7 @@ public class ShardedChatHomeService implements ChatHomeService { int shard = selectShard(id); return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) + ? Mono.error(new ShardNotOwnedException(instanceId, shard)) : chatHomes[shard] .getChatRoomData(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 5a8d4944..9cafbaaa 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -19,13 +19,13 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; -import java.util.function.Function; import java.util.stream.IntStream; @Slf4j public class DataChannel implements Runnable, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; @@ -45,6 +45,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public DataChannel( + String instanceId, String topic, Producer producer, Consumer dataChannelConsumer, @@ -55,9 +56,11 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener InfoChannel infoChannel) { log.debug( - "Creating DataChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; this.consumer = dataChannelConsumer; this.producer = producer; @@ -291,7 +294,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); } return infoChannel diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index cafc7757..7bb1ab97 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -140,6 +140,7 @@ public class KafkaServicesConfiguration InfoChannel infoChannel) { return new DataChannel( + properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, dataChannelConsumer, diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 1478e7c5..01de390c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -259,8 +259,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -279,8 +280,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -301,8 +303,9 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -328,8 +331,9 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -352,8 +356,9 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); + String instanceId = "peter"; int shard = 666; - when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard)); // When WebTestClient.ResponseSpec responseSpec = client diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java index 3ff9e9e1..b830f300 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java @@ -36,6 +36,7 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); return new ShardedChatHomeService( + "http://instance-0", chatHomes, IntStream .range(0, NUM_SHARDS) -- 2.20.1