private final Clock clock;
private final int historyLimit;
private Sinks.Many<Message> sink;
- private volatile boolean active = true;
+ private volatile boolean active = false;
public ChatRoomData(
public void activate()
{
+ if (active)
+ {
+ log.info("{} is already active!", service.getChatRoomId());
+ return;
+ }
+
log.info("{} is being activated", service.getChatRoomId());
this.sink = createSink();
active = true;
isShardOwned[partition] = true;
this.currentOffset[partition] = currentOffset;
- chatRoomData[partition]
- .values()
- .forEach(chatRoomData -> chatRoomData.activate());
-
log.info(
"Partition assigned: {} - loading messages: next={} -> current={}",
partition,
{
log.info("Loading of messages completed! Pausing all owned partitions...");
pauseAllOwnedPartions();
+ activateAllOwnedChatRooms();
log.info("Resuming normal operations...");
channelState = ChannelState.READY;
}
.toList());
}
+ private void activateAllOwnedChatRooms()
+ {
+ IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .forEach(shard -> chatRoomData[shard]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.activate()));
+ }
+
int[] getOwnedShards()
{