From: Kai Moritz Date: Fri, 15 Sep 2023 19:35:17 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2024-01-26--18-11~21 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=c186a4d189e2bfe1483f11f209b74fed1e7c7ff3;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d2d6f300..181f7bf7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; import java.time.*; import java.util.*; +import java.util.function.Function; import java.util.stream.IntStream; @@ -300,11 +301,16 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .getChatRoomInfo(id) .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent( id, - (chatRoomId) -> - { - log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - return new ChatRoomData(clock, service, bufferSize); - })); + computeChatRoomData())); + } + + private Function computeChatRoomData() + { + return (chatRoomId) -> + { + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); + }; } }