infoChannel.sendShardAssignedEvent(partition);
shardingPublisherStrategy
.publishOwnership(partition)
- .doOnNext(instanceId -> log.info(
- "Instance {} was published as owner of shard {}",
+ .doOnSuccess(instanceId -> log.info(
+ "Successfully published instance {} as owner of shard {}",
instanceId,
partition))
- .subscribe();
+ .doOnError(throwable -> log.error(
+ "Could not publish instance {} as owner of shard {}: {}",
+ instanceId,
+ partition,
+ throwable))
+ .block();
});
consumer.resume(partitions);