feat: Introduced a configurable instance-id
authorKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 08:48:03 +0000 (09:48 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:35:33 +0000 (10:35 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java

index ec5c7f5..c0b4934 100644 (file)
@@ -13,6 +13,7 @@ import java.nio.file.Paths;
 @Setter
 public class ChatBackendProperties
 {
+  private String instanceId = "DEV";
   private String allowedOrigins = "http://localhost:4200";
   private int chatroomBufferSize = 8;
   private ServiceType services = ServiceType.inmemory;
index 25df317..d5bc5ce 100644 (file)
@@ -5,13 +5,16 @@ import lombok.Getter;
 
 public class ShardNotOwnedException extends IllegalStateException
 {
+  @Getter
+  private final String instanceId;
   @Getter
   private final int shard;
 
 
-  public ShardNotOwnedException(int shard)
+  public ShardNotOwnedException(String instanceId, int shard)
   {
-    super("This instance does not own the shard " + shard);
+    super("Instance " + instanceId + " does not own the shard " + shard);
+    this.instanceId = instanceId;
     this.shard = shard;
   }
 }
index bc6f103..5b5785e 100644 (file)
@@ -58,6 +58,7 @@ public class InMemoryServicesConfiguration
             properties.getChatroomBufferSize()));
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
     return new ShardedChatHomeService(
+        properties.getInstanceId(),
         chatHomes,
         properties.getInmemory().getShardOwners(),
         strategy);
index c281d9e..ab7f8d4 100644 (file)
@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHomeService implements ChatHomeService
 {
+  private final String instanceId;
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
   private final String[] shardOwners;
@@ -26,10 +27,12 @@ public class ShardedChatHomeService implements ChatHomeService
 
 
   public ShardedChatHomeService(
+      String instanceId,
       SimpleChatHomeService[] chatHomes,
       URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
+    this.instanceId = instanceId;
     this.chatHomes = chatHomes;
     this.shardOwners = Arrays
         .stream(shardOwners)
@@ -54,7 +57,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = shardingStrategy.selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard].createChatRoom(id, name);
   }
 
@@ -63,7 +66,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomInfo(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -87,7 +90,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomData(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
index 5a8d494..9cafbaa 100644 (file)
@@ -19,13 +19,13 @@ import reactor.core.publisher.Mono;
 
 import java.time.*;
 import java.util.*;
-import java.util.function.Function;
 import java.util.stream.IntStream;
 
 
 @Slf4j
 public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
+  private final String instanceId;
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
@@ -45,6 +45,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
 
   public DataChannel(
+    String instanceId,
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> dataChannelConsumer,
@@ -55,9 +56,11 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     InfoChannel infoChannel)
   {
     log.debug(
-        "Creating DataChannel for topic {} with {} partitions",
+        "{}: Creating DataChannel for topic {} with {} partitions",
+        instanceId,
         topic,
         numShards);
+    this.instanceId = instanceId;
     this.topic = topic;
     this.consumer = dataChannelConsumer;
     this.producer = producer;
@@ -291,7 +294,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
     if (!isShardOwned[shard])
     {
-      return Mono.error(new ShardNotOwnedException(shard));
+      return Mono.error(new ShardNotOwnedException(instanceId, shard));
     }
 
     return infoChannel
index cafc775..7bb1ab9 100644 (file)
@@ -140,6 +140,7 @@ public class KafkaServicesConfiguration
       InfoChannel infoChannel)
   {
     return new DataChannel(
+        properties.getInstanceId(),
         properties.getKafka().getDataChannelTopic(),
         producer,
         dataChannelConsumer,
index 1478e7c..01de390 100644 (file)
@@ -259,8 +259,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -279,8 +280,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -301,8 +303,9 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -328,8 +331,9 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -352,8 +356,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
index 3ff9e9e..b830f30 100644 (file)
@@ -36,6 +36,7 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
       ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
 
       return new ShardedChatHomeService(
+          "http://instance-0",
           chatHomes,
           IntStream
               .range(0, NUM_SHARDS)