From c8a03c4607a4078a6f157d4ff232cf48fae02615 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Tue, 19 Mar 2024 10:45:35 +0100
Subject: [PATCH] feat: Shard-ownership is published asynchronously

---
 .../chat/backend/implementation/kafka/DataChannel.java   | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
index abe51f4a..9be75533 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
@@ -154,17 +154,16 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       channelMediator.shardAssigned(partition);
       shardingPublisherStrategy
           .publishOwnership(partition)
-          .doOnSuccess(instanceId -> log.info(
-              "Successfully published instance {} as owner of shard {}",
-              instanceId,
-              partition))
           .doOnError(throwable -> log.error(
               "Could not publish instance {} as owner of shard {}: {}",
               instanceId,
               partition,
               throwable.toString()))
           .onErrorComplete()
-          .block();
+          .subscribe(instanceId -> log.info(
+              "Successfully published instance {} as owner of shard {}",
+              instanceId,
+              partition));
     });
 
     consumer.resume(partitions);
-- 
2.20.1