From a16499143b8328573dc34fe58bb7ccb4aad1790a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 20 Feb 2024 10:26:25 +0100 Subject: [PATCH] refactor: Refined success/error-handling when publishing shard-ownership --- .../backend/implementation/kafka/DataChannel.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 2fa4998e..99beb438 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -147,11 +147,16 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener infoChannel.sendShardAssignedEvent(partition); shardingPublisherStrategy .publishOwnership(partition) - .doOnNext(instanceId -> log.info( - "Instance {} was published as owner of shard {}", + .doOnSuccess(instanceId -> log.info( + "Successfully published instance {} as owner of shard {}", instanceId, partition)) - .subscribe(); + .doOnError(throwable -> log.error( + "Could not publish instance {} as owner of shard {}: {}", + instanceId, + partition, + throwable)) + .block(); }); consumer.resume(partitions); -- 2.20.1