if (metadata != null)
{
log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
- ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
createChatRoom(chatRoomInfo);
sink.success(chatRoomInfo);
}
CreateChatRoomRequestTo createChatRoomRequestTo,
int partition)
{
- putChatRoom(ChatRoomInfo.of(
+ log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+ ChatRoom chatRoom = new ChatRoom(
chatRoomId,
createChatRoomRequestTo.getName(),
- partition));
+ partition,
+ clock,
+ service,
+ bufferSize);
+ putChatRoom(chatRoom);
}