private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final InfoChannel infoChannel;
+ private final ShardingPublisherStrategy shardingPublisherStrategy;
private boolean running;
@Getter
int numShards,
int bufferSize,
Clock clock,
- InfoChannel infoChannel)
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
log.debug(
"{}: Creating DataChannel for topic {} with {} partitions",
.range(0, numShards)
.forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
this.infoChannel = infoChannel;
+ this.shardingPublisherStrategy = shardingPublisherStrategy;
}
consumer.seek(topicPartition, nextOffset[partition]);
infoChannel.sendShardAssignedEvent(partition);
+ shardingPublisherStrategy
+ .publishOwnership(partition)
+ .doOnSuccess(instanceId -> log.info(
+ "Successfully published instance {} as owner of shard {}",
+ instanceId,
+ partition))
+ .doOnError(throwable -> log.error(
+ "Could not publish instance {} as owner of shard {}: {}",
+ instanceId,
+ partition,
+ throwable))
+ .block();
});
consumer.resume(partitions);