WIP:haproxy
authorKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:31:22 +0000 (11:31 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 17 Sep 2023 09:31:22 +0000 (11:31 +0200)
src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java
new file mode 100644 (file)
index 0000000..59fd80c
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Mono;
+
+
+public interface ShardingPublisherStrategy
+{
+  Mono<String[]> publishOwnership(int shard);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java
new file mode 100644 (file)
index 0000000..296d64f
--- /dev/null
@@ -0,0 +1,25 @@
+package de.juplo.kafka.chat.backend.implementation.haproxy;
+
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+import java.net.URI;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy
+{
+  private final URI runtimeApiUri;
+  private final WebClient webClient;
+
+
+  @Override
+  public Mono<String[]> publishOwnership(int shard)
+  {
+    return Mono.error(new RuntimeException("TODO"));
+  }
+}
index 4eedeb4..e0d42d4 100644 (file)
@@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
   private final InfoChannel infoChannel;
+  private final ShardingPublisherStrategy shardingPublisherStrategy;
 
   private boolean running;
   @Getter
@@ -52,7 +53,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     int numShards,
     int bufferSize,
     Clock clock,
-    InfoChannel infoChannel)
+    InfoChannel infoChannel,
+    ShardingPublisherStrategy shardingPublisherStrategy)
   {
     log.debug(
         "Creating DataChannel for topic {} with {} partitions",
@@ -73,6 +75,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         .range(0, numShards)
         .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
     this.infoChannel = infoChannel;
+    this.shardingPublisherStrategy = shardingPublisherStrategy;
   }
 
 
@@ -139,6 +142,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       consumer.seek(topicPartition, nextOffset[partition]);
       infoChannel.sendShardAssignedEvent(partition);
+      shardingPublisherStrategy.publishOwnership(partition);
     });
 
     consumer.resume(partitions);