consumer.seek(topicPartition, nextOffset[partition]);
infoChannel.sendShardAssignedEvent(partition);
- shardingPublisherStrategy.publishOwnership(partition);
+ shardingPublisherStrategy
+ .publishOwnership(partition)
+ .map(instanceId -> "Instance "
+ + instanceId
+ + " was published as owner of shard "
+ + partition)
+ .doOnError(e -> log.error("Could not publish ownershit for shard " + partition + ": " + e))
+ .subscribe();
});
consumer.resume(partitions);