WIP:haproxy
authorKai Moritz <kai@juplo.de>
Fri, 22 Sep 2023 14:56:06 +0000 (16:56 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Sep 2023 14:56:06 +0000 (16:56 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java

index 9ace559..21330e1 100644 (file)
@@ -13,6 +13,7 @@ import java.nio.file.Paths;
 @Setter
 public class ChatBackendProperties
 {
+  private String instanceId = "DEV";
   private String allowedOrigins = "http://localhost:4200";
   private int chatroomBufferSize = 8;
   private ServiceType services = ServiceType.inmemory;
@@ -20,7 +21,6 @@ public class ChatBackendProperties
   private KafkaServicesProperties kafka = new KafkaServicesProperties();
   private String haproxyRuntimeApi = "haproxy:8401";
   private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
-  private String haproxyInstanceId = "DEV";
 
 
   @Getter
index 25df317..d5bc5ce 100644 (file)
@@ -5,13 +5,16 @@ import lombok.Getter;
 
 public class ShardNotOwnedException extends IllegalStateException
 {
+  @Getter
+  private final String instanceId;
   @Getter
   private final int shard;
 
 
-  public ShardNotOwnedException(int shard)
+  public ShardNotOwnedException(String instanceId, int shard)
   {
-    super("This instance does not own the shard " + shard);
+    super("Instance " + instanceId + " does not own the shard " + shard);
+    this.instanceId = instanceId;
     this.shard = shard;
   }
 }
index bc6f103..5b5785e 100644 (file)
@@ -58,6 +58,7 @@ public class InMemoryServicesConfiguration
             properties.getChatroomBufferSize()));
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
     return new ShardedChatHomeService(
+        properties.getInstanceId(),
         chatHomes,
         properties.getInmemory().getShardOwners(),
         strategy);
index c281d9e..ab7f8d4 100644 (file)
@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHomeService implements ChatHomeService
 {
+  private final String instanceId;
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
   private final String[] shardOwners;
@@ -26,10 +27,12 @@ public class ShardedChatHomeService implements ChatHomeService
 
 
   public ShardedChatHomeService(
+      String instanceId,
       SimpleChatHomeService[] chatHomes,
       URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
+    this.instanceId = instanceId;
     this.chatHomes = chatHomes;
     this.shardOwners = Arrays
         .stream(shardOwners)
@@ -54,7 +57,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = shardingStrategy.selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard].createChatRoom(id, name);
   }
 
@@ -63,7 +66,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomInfo(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -87,7 +90,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomData(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
index 1a2f1b0..532a3c1 100644 (file)
@@ -19,13 +19,13 @@ import reactor.core.publisher.Mono;
 
 import java.time.*;
 import java.util.*;
-import java.util.function.Function;
 import java.util.stream.IntStream;
 
 
 @Slf4j
 public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
+  private final String instanceId;
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
@@ -46,6 +46,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
 
   public DataChannel(
+    String instanceId,
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> dataChannelConsumer,
@@ -57,9 +58,11 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     ShardingPublisherStrategy shardingPublisherStrategy)
   {
     log.debug(
-        "Creating DataChannel for topic {} with {} partitions",
+        "{}: Creating DataChannel for topic {} with {} partitions",
+        instanceId,
         topic,
         numShards);
+    this.instanceId = instanceId;
     this.topic = topic;
     this.consumer = dataChannelConsumer;
     this.producer = producer;
@@ -301,7 +304,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
     if (!isShardOwned[shard])
     {
-      return Mono.error(new ShardNotOwnedException(shard));
+      return Mono.error(new ShardNotOwnedException(instanceId, shard));
     }
 
     return infoChannel
index 2d0f4c8..4d1098f 100644 (file)
@@ -22,7 +22,6 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.web.reactive.function.client.WebClient;
 
 import java.net.InetSocketAddress;
 import java.time.Clock;
@@ -143,6 +142,7 @@ public class KafkaServicesConfiguration
       ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
+        properties.getInstanceId(),
         properties.getKafka().getDataChannelTopic(),
         producer,
         dataChannelConsumer,
@@ -292,7 +292,7 @@ public class KafkaServicesConfiguration
     return new HaproxyShardingPublisherStrategy(
         haproxyAddress,
         properties.getHaproxyMap(),
-        properties.getHaproxyInstanceId());
+        properties.getInstanceId());
   }
 
   @Bean
index 1478e7c..01de390 100644 (file)
@@ -259,8 +259,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -279,8 +280,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -301,8 +303,9 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -328,8 +331,9 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -352,8 +356,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
index 3ff9e9e..b830f30 100644 (file)
@@ -36,6 +36,7 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
       ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
 
       return new ShardedChatHomeService(
+          "http://instance-0",
           chatHomes,
           IntStream
               .range(0, NUM_SHARDS)