From fd824d87628150fa52a046f233187404fe57366b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 17 Sep 2023 11:31:22 +0200 Subject: [PATCH] WIP:haproxy --- .../domain/ShardingPublisherStrategy.java | 9 +++++++ .../HaproxyShardingPublisherStrategy.java | 25 +++++++++++++++++++ .../implementation/kafka/DataChannel.java | 6 ++++- 3 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java new file mode 100644 index 00000000..59fd80cb --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Mono; + + +public interface ShardingPublisherStrategy +{ + Mono publishOwnership(int shard); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java new file mode 100644 index 00000000..296d64fe --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -0,0 +1,25 @@ +package de.juplo.kafka.chat.backend.implementation.haproxy; + +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + +import java.net.URI; + + +@RequiredArgsConstructor +@Slf4j +public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrategy +{ + private final URI runtimeApiUri; + private final WebClient webClient; + + + @Override + public Mono publishOwnership(int shard) + { + return Mono.error(new RuntimeException("TODO")); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4eedeb4a..e0d42d42 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] nextOffset; private final Map[] chatRoomData; private final InfoChannel infoChannel; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter @@ -52,7 +53,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int numShards, int bufferSize, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( "Creating DataChannel for topic {} with {} partitions", @@ -73,6 +75,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); this.infoChannel = infoChannel; + this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -139,6 +142,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); infoChannel.sendShardAssignedEvent(partition); + shardingPublisherStrategy.publishOwnership(partition); }); consumer.resume(partitions); -- 2.20.1