From 8b157b4b3978d89a6a19462b4bd6108847407ead Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 18 Aug 2023 15:46:59 +0200 Subject: [PATCH] NG --- .../persistence/kafka/ChatMessageChannel.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 94f6fa6b..f6e70912 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -90,7 +90,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener if (metadata != null) { log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition()); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); createChatRoom(chatRoomInfo); sink.success(chatRoomInfo); } @@ -280,10 +280,16 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener CreateChatRoomRequestTo createChatRoomRequestTo, int partition) { - putChatRoom(ChatRoomInfo.of( + log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoom chatRoom = new ChatRoom( chatRoomId, createChatRoomRequestTo.getName(), - partition)); + partition, + clock, + service, + bufferSize); + putChatRoom(chatRoom); } -- 2.20.1