From 9f01b420a18c3550ad0033d5f0977a974e585818 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Mar 2024 16:33:13 +0100 Subject: [PATCH] refactor: Only `ChannelMediator#chatRoomCreated()` creates `ChatRoomData` --- .../implementation/kafka/DataChannel.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 0ead9ef9..0bffa0b2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -287,7 +287,12 @@ public class DataChannel implements Channel, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = computeChatRoomData(chatRoomId, partition); + ChatRoomInfo chatRoomInfo = Mono + .just(chatRoomId) + .flatMap(id -> channelMediator.getChatRoomInfo(id)) + .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))) + .block(); + ChatRoomData chatRoomData = this.chatRoomData[chatRoomInfo.getShard()].get(chatRoomId); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); @@ -344,9 +349,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener { int shard = chatRoomInfo.getShard(); - ChatRoomData chatRoomData = computeChatRoomData( - chatRoomInfo.getId(), - chatRoomInfo.getShard()); + ChatRoomData chatRoomData = computeChatRoomData(chatRoomInfo); // TODO: Possible race-condition in case of an ongoing rebalance! if (isShardOwned[shard]) @@ -371,15 +374,17 @@ public class DataChannel implements Channel, ConsumerRebalanceListener return Mono.justOrEmpty(chatRoomData[shard].get(id)); } - private ChatRoomData computeChatRoomData(UUID chatRoomId, int shard) + private ChatRoomData computeChatRoomData(ChatRoomInfo chatRoomInfo) { + UUID chatRoomId = chatRoomInfo.getId(); + int shard = chatRoomInfo.getShard(); ChatRoomData chatRoomData = this.chatRoomData[shard].get(chatRoomId); if (chatRoomData != null) { - log.info( + log.error( "Ignoring request to create already existing ChatRoomData for {}", - chatRoomId); + chatRoomInfo); } else { -- 2.39.5