From: Kai Moritz Date: Tue, 19 Mar 2024 09:45:35 +0000 (+0100) Subject: feat: Shard-ownership is published asynchronously X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=c8a03c4607a4078a6f157d4ff232cf48fae02615;p=demos%2Fkafka%2Fchat feat: Shard-ownership is published asynchronously --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index abe51f4a..9be75533 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -154,17 +154,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener channelMediator.shardAssigned(partition); shardingPublisherStrategy .publishOwnership(partition) - .doOnSuccess(instanceId -> log.info( - "Successfully published instance {} as owner of shard {}", - instanceId, - partition)) .doOnError(throwable -> log.error( "Could not publish instance {} as owner of shard {}: {}", instanceId, partition, throwable.toString())) .onErrorComplete() - .block(); + .subscribe(instanceId -> log.info( + "Successfully published instance {} as owner of shard {}", + instanceId, + partition)); }); consumer.resume(partitions);