isShardOwned[partition] = true;
this.currentOffset[partition] = currentOffset;
+ chatRoomData[partition]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.activate());
+
log.info(
"Partition assigned: {} - loading messages: next={} -> current={}",
partition,
partitions.forEach(topicPartition ->
{
int partition = topicPartition.partition();
- chatRoomData[partition]
- .values()
- .forEach(chatRoomData -> chatRoomData.close());
isShardOwned[partition] = false;
nextOffset[partition] = consumer.position(topicPartition);
+
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+
+ chatRoomData[partition]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.deactivate());
+
channelMediator.shardRevoked(partition);
});
}
void createChatRoomData(ChatRoomInfo chatRoomInfo)
{
- computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard());
+ ChatRoomData chatRoomData = computeChatRoomData(
+ chatRoomInfo.getId(),
+ chatRoomInfo.getShard());
+ chatRoomData.activate();
}
Mono<ChatRoomData> getChatRoomData(int shard, UUID id)