From f7a0a821c0beb1173a5de405aa664feb532d634f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 21:35:17 +0200 Subject: [PATCH] refactor: DRY for computation of new `ChatRoomData` instances --- .../implementation/kafka/DataChannel.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d2d6f300..da906631 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; +import java.util.function.Function; import java.util.stream.IntStream; @@ -244,14 +245,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent( - chatRoomId, - (id) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - return new ChatRoomData(clock, service, bufferSize); - }); + ChatRoomData chatRoomData = this + .chatRoomData[partition] + .computeIfAbsent(chatRoomId, this::computeChatRoomData); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); @@ -298,13 +294,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return infoChannel .getChatRoomInfo(id) - .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( - id, - (chatRoomId) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - return new ChatRoomData(clock, service, bufferSize); - })); + .map(chatRoomInfo -> + chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); + } + + private ChatRoomData computeChatRoomData(UUID chatRoomId) + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); } } -- 2.20.1