X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=99beb438191d43c3f426e0f9c05e8ca97e3ebda2;hb=27a2baceb8e94d41f14a6cd2598fa65e0ac514b9;hp=2fa4998e72c406afad6705192ef2c535bcd483ac;hpb=bab48fac3824b1dbf660881e5c517dc98ab9cc8f;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 2fa4998e..99beb438 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -147,11 +147,16 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener infoChannel.sendShardAssignedEvent(partition); shardingPublisherStrategy .publishOwnership(partition) - .doOnNext(instanceId -> log.info( - "Instance {} was published as owner of shard {}", + .doOnSuccess(instanceId -> log.info( + "Successfully published instance {} as owner of shard {}", instanceId, partition)) - .subscribe(); + .doOnError(throwable -> log.error( + "Could not publish instance {} as owner of shard {}: {}", + instanceId, + partition, + throwable)) + .block(); }); consumer.resume(partitions);