WIP:haproxy
authorKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:31:22 +0000 (11:31 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 3 Feb 2024 14:11:18 +0000 (15:11 +0100)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ShardNotOwnedException.java
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index ec5c7f5..21330e1 100644 (file)
@@ -13,11 +13,14 @@ import java.nio.file.Paths;
 @Setter
 public class ChatBackendProperties
 {
+  private String instanceId = "DEV";
   private String allowedOrigins = "http://localhost:4200";
   private int chatroomBufferSize = 8;
   private ServiceType services = ServiceType.inmemory;
   private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
   private KafkaServicesProperties kafka = new KafkaServicesProperties();
+  private String haproxyRuntimeApi = "haproxy:8401";
+  private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
 
 
   @Getter
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java
new file mode 100644 (file)
index 0000000..9a1c725
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Mono;
+
+
+public interface ShardingPublisherStrategy
+{
+  Mono<String> publishOwnership(int shard);
+}
index 25df317..d5bc5ce 100644 (file)
@@ -5,13 +5,16 @@ import lombok.Getter;
 
 public class ShardNotOwnedException extends IllegalStateException
 {
+  @Getter
+  private final String instanceId;
   @Getter
   private final int shard;
 
 
-  public ShardNotOwnedException(int shard)
+  public ShardNotOwnedException(String instanceId, int shard)
   {
-    super("This instance does not own the shard " + shard);
+    super("Instance " + instanceId + " does not own the shard " + shard);
+    this.instanceId = instanceId;
     this.shard = shard;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java
new file mode 100644 (file)
index 0000000..3caaeb3
--- /dev/null
@@ -0,0 +1,41 @@
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
+{
+  private final SocketAddress haproxyAddress;
+  private final String map;
+  private final String instanceId;
+
+
+  @Override
+  public Mono<String> publishOwnership(int shard)
+  {
+    try
+    {
+      SocketChannel socketChannel = SocketChannel.open(haproxyAddress);
+      String command = "set map " + map + " " + Integer.toString(shard) + " " + instanceId + "\n";
+      byte[] commandBytes = command.getBytes();
+      ByteBuffer buffer = ByteBuffer.wrap(commandBytes);
+      socketChannel.write(buffer);
+      socketChannel.close();
+      return Mono.just(instanceId);
+    }
+    catch (IOException e)
+    {
+      return Mono.error(e);
+    }
+  }
+}
index bc6f103..5b5785e 100644 (file)
@@ -58,6 +58,7 @@ public class InMemoryServicesConfiguration
             properties.getChatroomBufferSize()));
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
     return new ShardedChatHomeService(
+        properties.getInstanceId(),
         chatHomes,
         properties.getInmemory().getShardOwners(),
         strategy);
index c281d9e..ab7f8d4 100644 (file)
@@ -19,6 +19,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public class ShardedChatHomeService implements ChatHomeService
 {
+  private final String instanceId;
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
   private final String[] shardOwners;
@@ -26,10 +27,12 @@ public class ShardedChatHomeService implements ChatHomeService
 
 
   public ShardedChatHomeService(
+      String instanceId,
       SimpleChatHomeService[] chatHomes,
       URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
+    this.instanceId = instanceId;
     this.chatHomes = chatHomes;
     this.shardOwners = Arrays
         .stream(shardOwners)
@@ -54,7 +57,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = shardingStrategy.selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard].createChatRoom(id, name);
   }
 
@@ -63,7 +66,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomInfo(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
@@ -87,7 +90,7 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     int shard = selectShard(id);
     return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
+        ? Mono.error(new ShardNotOwnedException(instanceId, shard))
         : chatHomes[shard]
             .getChatRoomData(id)
             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
index 5a8d494..2fa4998 100644 (file)
@@ -19,13 +19,13 @@ import reactor.core.publisher.Mono;
 
 import java.time.*;
 import java.util.*;
-import java.util.function.Function;
 import java.util.stream.IntStream;
 
 
 @Slf4j
 public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
+  private final String instanceId;
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
@@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
   private final InfoChannel infoChannel;
+  private final ShardingPublisherStrategy shardingPublisherStrategy;
 
   private boolean running;
   @Getter
@@ -45,6 +46,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
 
   public DataChannel(
+    String instanceId,
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> dataChannelConsumer,
@@ -52,12 +54,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     int numShards,
     int bufferSize,
     Clock clock,
-    InfoChannel infoChannel)
+    InfoChannel infoChannel,
+    ShardingPublisherStrategy shardingPublisherStrategy)
   {
     log.debug(
-        "Creating DataChannel for topic {} with {} partitions",
+        "{}: Creating DataChannel for topic {} with {} partitions",
+        instanceId,
         topic,
         numShards);
+    this.instanceId = instanceId;
     this.topic = topic;
     this.consumer = dataChannelConsumer;
     this.producer = producer;
@@ -73,6 +78,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         .range(0, numShards)
         .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
     this.infoChannel = infoChannel;
+    this.shardingPublisherStrategy = shardingPublisherStrategy;
   }
 
 
@@ -139,6 +145,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       consumer.seek(topicPartition, nextOffset[partition]);
       infoChannel.sendShardAssignedEvent(partition);
+      shardingPublisherStrategy
+          .publishOwnership(partition)
+          .doOnNext(instanceId -> log.info(
+              "Instance {} was published as owner of shard {}",
+              instanceId,
+              partition))
+          .subscribe();
     });
 
     consumer.resume(partitions);
@@ -291,7 +304,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
     if (!isShardOwned[shard])
     {
-      return Mono.error(new ShardNotOwnedException(shard));
+      return Mono.error(new ShardNotOwnedException(instanceId, shard));
     }
 
     return infoChannel
index cafc775..54c4830 100644 (file)
@@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
@@ -21,6 +23,7 @@ import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
+import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
@@ -137,9 +140,11 @@ public class KafkaServicesConfiguration
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel)
+      InfoChannel infoChannel,
+      ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
+        properties.getInstanceId(),
         properties.getKafka().getDataChannelTopic(),
         producer,
         dataChannelConsumer,
@@ -147,7 +152,8 @@ public class KafkaServicesConfiguration
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
         clock,
-        infoChannel);
+        infoChannel,
+        shardingPublisherStrategy);
   }
 
   @Bean
@@ -279,6 +285,18 @@ public class KafkaServicesConfiguration
     return properties;
   }
 
+  @Bean
+  ShardingPublisherStrategy shardingPublisherStrategy(
+      ChatBackendProperties properties)
+  {
+    String[] parts = properties.getHaproxyRuntimeApi().split(":");
+    InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
+    return new HaproxyShardingPublisherStrategy(
+        haproxyAddress,
+        properties.getHaproxyMap(),
+        properties.getInstanceId());
+  }
+
   @Bean
   ZoneId zoneId()
   {
index 1478e7c..01de390 100644 (file)
@@ -259,8 +259,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -279,8 +280,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -301,8 +303,9 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -328,8 +331,9 @@ public class ChatBackendControllerTest
     UUID chatroomId = UUID.randomUUID();
     String username = "foo";
     Long messageId = 66l;
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
@@ -352,8 +356,9 @@ public class ChatBackendControllerTest
   {
     // Given
     UUID chatroomId = UUID.randomUUID();
+    String instanceId = "peter";
     int shard = 666;
-    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+    when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(instanceId, shard));
 
     // When
     WebTestClient.ResponseSpec responseSpec = client
index 3ff9e9e..b830f30 100644 (file)
@@ -36,6 +36,7 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
       ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
 
       return new ShardedChatHomeService(
+          "http://instance-0",
           chatHomes,
           IntStream
               .range(0, NUM_SHARDS)
index 956d7ce..52a527d 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
@@ -10,6 +11,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
+import reactor.core.publisher.Mono;
 
 import java.time.Clock;
 import java.util.List;
@@ -23,6 +25,12 @@ public class KafkaTestUtils
   @Import(KafkaServicesConfiguration.class)
   public static class KafkaTestConfiguration
   {
+    @Bean
+    public ShardingPublisherStrategy shardingPublisherStrategy()
+    {
+      return shard -> Mono.just("MOCKED!");
+    }
+
     @Bean
     public WorkAssignor dataChannelWorkAssignor(
         ChatBackendProperties properties,