From: Kai Moritz Date: Fri, 18 Aug 2023 13:46:59 +0000 (+0200) Subject: NG X-Git-Tag: rebase--2023-08-18-abends~11 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=8b157b4b3978d89a6a19462b4bd6108847407ead;p=demos%2Fkafka%2Fchat NG --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 94f6fa6b..f6e70912 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -90,7 +90,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener if (metadata != null) { log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition()); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); createChatRoom(chatRoomInfo); sink.success(chatRoomInfo); } @@ -280,10 +280,16 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener CreateChatRoomRequestTo createChatRoomRequestTo, int partition) { - putChatRoom(ChatRoomInfo.of( + log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoom chatRoom = new ChatRoom( chatRoomId, createChatRoomRequestTo.getName(), - partition)); + partition, + clock, + service, + bufferSize); + putChatRoom(chatRoom); }