NG
authorKai Moritz <kai@juplo.de>
Fri, 18 Aug 2023 13:46:59 +0000 (15:46 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 18 Aug 2023 15:18:41 +0000 (17:18 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java

index 94f6fa6..f6e7091 100644 (file)
@@ -90,7 +90,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
         if (metadata != null)
         {
           log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
-          ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
+          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
           createChatRoom(chatRoomInfo);
           sink.success(chatRoomInfo);
         }
@@ -280,10 +280,16 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
       CreateChatRoomRequestTo createChatRoomRequestTo,
       int partition)
   {
-    putChatRoom(ChatRoomInfo.of(
+    log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+    ChatRoom chatRoom = new ChatRoom(
         chatRoomId,
         createChatRoomRequestTo.getName(),
-        partition));
+        partition,
+        clock,
+        service,
+        bufferSize);
+    putChatRoom(chatRoom);
   }