From df31cb1edc29d872456773e18a32ba7b7442ae6b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 17 Sep 2023 11:52:00 +0200 Subject: [PATCH] WIP:haproxy --- .../chat/backend/domain/ShardingPublisherStrategy.java | 2 +- .../chat/backend/implementation/kafka/DataChannel.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java index 59fd80cb..9a1c725e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java @@ -5,5 +5,5 @@ import reactor.core.publisher.Mono; public interface ShardingPublisherStrategy { - Mono publishOwnership(int shard); + Mono publishOwnership(int shard); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index e0d42d42..114619ed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -142,7 +142,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); infoChannel.sendShardAssignedEvent(partition); - shardingPublisherStrategy.publishOwnership(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .map(instanceId -> "Instance " + + instanceId + + " was published as owner of shard " + + partition) + .doOnError(e -> log.error("Could not publish ownershit for shard " + partition + ": " + e)) + .subscribe(); }); consumer.resume(partitions); -- 2.20.1