private final ZoneId zoneId;
private final int numShards;
private final Duration pollingInterval;
- private final int bufferSize;
+ private final int historyLimit;
private final Clock clock;
private final boolean[] isShardOwned;
private final long[] currentOffset;
ZoneId zoneId,
int numShards,
Duration pollingInterval,
- int bufferSize,
+ int historyLimit,
Clock clock,
ChannelMediator channelMediator,
ShardingPublisherStrategy shardingPublisherStrategy)
this.zoneId = zoneId;
this.numShards = numShards;
this.pollingInterval = pollingInterval;
- this.bufferSize = bufferSize;
+ this.historyLimit = historyLimit;
this.clock = clock;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
int partition = topicPartition.partition();
isShardOwned[partition] = false;
nextOffset[partition] = consumer.position(topicPartition);
+
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+ chatRoomData[partition]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.deactivate());
+
channelMediator.shardRevoked(partition);
});
}
{
log.info("Loading of messages completed! Pausing all owned partitions...");
pauseAllOwnedPartions();
+ activateAllOwnedChatRooms();
log.info("Resuming normal operations...");
channelState = ChannelState.READY;
}
.toList());
}
+ private void activateAllOwnedChatRooms()
+ {
+ IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .forEach(shard -> chatRoomData[shard]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.activate()));
+ }
+
int[] getOwnedShards()
{
void createChatRoomData(ChatRoomInfo chatRoomInfo)
{
- computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+ int shard = chatRoomInfo.getShard();
+
+ ChatRoomData chatRoomData = computeChatRoomData(
+ chatRoomInfo.getId(),
+ chatRoomInfo.getShard());
+
+ // TODO: Possible race-condition in case of an ongoing rebalance!
+ if (isShardOwned[shard])
+ {
+ chatRoomData.activate();
+ }
}
Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
}
else
{
- log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize);
+ log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
- chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ chatRoomData = new ChatRoomData(clock, service, historyLimit);
this.chatRoomData[shard].put(chatRoomId, chatRoomData);
}