Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoomData chatRoomData = computeChatRoomData(chatRoomId, partition);
+ ChatRoomInfo chatRoomInfo = Mono
+ .just(chatRoomId)
+ .flatMap(id -> channelMediator.getChatRoomInfo(id))
+ .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
+ .block();
+ ChatRoomData chatRoomData = this.chatRoomData[chatRoomInfo.getShard()].get(chatRoomId);
KafkaChatMessageService kafkaChatRoomService =
(KafkaChatMessageService) chatRoomData.getChatRoomService();
{
int shard = chatRoomInfo.getShard();
- ChatRoomData chatRoomData = computeChatRoomData(
- chatRoomInfo.getId(),
- chatRoomInfo.getShard());
+ ChatRoomData chatRoomData = computeChatRoomData(chatRoomInfo);
// TODO: Possible race-condition in case of an ongoing rebalance!
if (isShardOwned[shard])
return Mono.justOrEmpty(chatRoomData[shard].get(id));
}
- private ChatRoomData computeChatRoomData(UUID chatRoomId, int shard)
+ private ChatRoomData computeChatRoomData(ChatRoomInfo chatRoomInfo)
{
+ UUID chatRoomId = chatRoomInfo.getId();
+ int shard = chatRoomInfo.getShard();
ChatRoomData chatRoomData = this.chatRoomData[shard].get(chatRoomId);
if (chatRoomData != null)
{
- log.info(
+ log.error(
"Ignoring request to create already existing ChatRoomData for {}",
- chatRoomId);
+ chatRoomInfo);
}
else
{