isShardOwned[partition] = true;
this.currentOffset[partition] = currentOffset;
- chatRoomData[partition]
- .values()
- .forEach(chatRoomData -> chatRoomData.activate());
-
log.info(
"Partition assigned: {} - loading messages: next={} -> current={}",
partition,
{
log.info("Loading of messages completed! Pausing all owned partitions...");
pauseAllOwnedPartions();
+ activateAllOwnedChatRooms();
log.info("Resuming normal operations...");
channelState = ChannelState.READY;
}
.toList());
}
+ private void activateAllOwnedChatRooms()
+ {
+ IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .forEach(shard -> chatRoomData[shard]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.activate()));
+ }
+
int[] getOwnedShards()
{