refactor: Refined success/error-handling when publishing shard-ownership
authorKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:26:25 +0000 (10:26 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:26:25 +0000 (10:26 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 2fa4998..99beb43 100644 (file)
@@ -147,11 +147,16 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       infoChannel.sendShardAssignedEvent(partition);
       shardingPublisherStrategy
           .publishOwnership(partition)
-          .doOnNext(instanceId -> log.info(
-              "Instance {} was published as owner of shard {}",
+          .doOnSuccess(instanceId -> log.info(
+              "Successfully published instance {} as owner of shard {}",
               instanceId,
               partition))
-          .subscribe();
+          .doOnError(throwable -> log.error(
+              "Could not publish instance {} as owner of shard {}: {}",
+              instanceId,
+              partition,
+              throwable))
+          .block();
     });
 
     consumer.resume(partitions);