infoChannel.sendShardAssignedEvent(partition);
shardingPublisherStrategy
.publishOwnership(partition)
- .doOnNext(instanceId -> log.info(
- "Instance {} was published as owner of shard {}",
+ .doOnSuccess(instanceId -> log.info(
+ "Successfully published instance {} as owner of shard {}",
instanceId,
partition))
- .subscribe();
+ .doOnError(throwable -> log.error(
+ "Could not publish instance {} as owner of shard {}: {}",
+ instanceId,
+ partition,
+ throwable))
+ .block();
});
consumer.resume(partitions);
KafkaChatMessageService kafkaChatRoomService =
(KafkaChatMessageService) chatRoomData.getChatRoomService();
+ log.debug(
+ "Loaded message from partition={} at offset={}: {}",
+ partition,
+ offset,
+ message);
kafkaChatRoomService.persistMessage(message);
}