From: Kai Moritz Date: Tue, 20 Feb 2024 09:26:25 +0000 (+0100) Subject: refactor: Refined success/error-handling when publishing shard-ownership X-Git-Tag: rebase--2024-02-20--10-29~26 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a16499143b8328573dc34fe58bb7ccb4aad1790a;p=demos%2Fkafka%2Fchat refactor: Refined success/error-handling when publishing shard-ownership --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 2fa4998e..99beb438 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -147,11 +147,16 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener infoChannel.sendShardAssignedEvent(partition); shardingPublisherStrategy .publishOwnership(partition) - .doOnNext(instanceId -> log.info( - "Instance {} was published as owner of shard {}", + .doOnSuccess(instanceId -> log.info( + "Successfully published instance {} as owner of shard {}", instanceId, partition)) - .subscribe(); + .doOnError(throwable -> log.error( + "Could not publish instance {} as owner of shard {}: {}", + instanceId, + partition, + throwable)) + .block(); }); consumer.resume(partitions);