From: Kai Moritz Date: Sun, 17 Sep 2023 09:57:38 +0000 (+0200) Subject: WIP:haproxy X-Git-Tag: rebase--2024-01-26--18-11~10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f7e6d8ad0f07659219754d229a1c790356bd52ae;p=demos%2Fkafka%2Fchat WIP:haproxy --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java index 7835b188..59917091 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -18,7 +18,7 @@ public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrate @Override - public Mono publishOwnership(int shard) + public Mono publishOwnership(int shard) { return Mono.error(new RuntimeException("TODO")); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 114619ed..1a2f1b0f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -144,11 +144,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener infoChannel.sendShardAssignedEvent(partition); shardingPublisherStrategy .publishOwnership(partition) - .map(instanceId -> "Instance " - + instanceId - + " was published as owner of shard " - + partition) - .doOnError(e -> log.error("Could not publish ownershit for shard " + partition + ": " + e)) + .doOnNext(instanceId -> log.info( + "Instance {} was published as owner of shard {}", + instanceId, + partition)) .subscribe(); });