feat: Shard-ownership is published asynchronously
authorKai Moritz <kai@juplo.de>
Tue, 19 Mar 2024 09:45:35 +0000 (10:45 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 22 Mar 2024 16:39:20 +0000 (17:39 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index abe51f4..9be7553 100644 (file)
@@ -154,17 +154,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       channelMediator.shardAssigned(partition);
       shardingPublisherStrategy
           .publishOwnership(partition)
-          .doOnSuccess(instanceId -> log.info(
-              "Successfully published instance {} as owner of shard {}",
-              instanceId,
-              partition))
           .doOnError(throwable -> log.error(
               "Could not publish instance {} as owner of shard {}: {}",
               instanceId,
               partition,
               throwable.toString()))
           .onErrorComplete()
-          .block();
+          .subscribe(instanceId -> log.info(
+              "Successfully published instance {} as owner of shard {}",
+              instanceId,
+              partition));
     });
 
     consumer.resume(partitions);