channelMediator.shardAssigned(partition);
shardingPublisherStrategy
.publishOwnership(partition)
- .doOnSuccess(instanceId -> log.info(
- "Successfully published instance {} as owner of shard {}",
- instanceId,
- partition))
.doOnError(throwable -> log.error(
"Could not publish instance {} as owner of shard {}: {}",
instanceId,
partition,
throwable.toString()))
.onErrorComplete()
- .block();
+ .subscribe(instanceId -> log.info(
+ "Successfully published instance {} as owner of shard {}",
+ instanceId,
+ partition));
});
consumer.resume(partitions);