+
+ private ChatRoomData computeChatRoomData(UUID chatRoomId, int shard)
+ {
+ ChatRoomData chatRoomData = this.chatRoomData[shard].get(chatRoomId);
+
+ if (chatRoomData != null)
+ {
+ log.info(
+ "Ignoring request to create already existing ChatRoomData for {}",
+ chatRoomId);
+ }
+ else
+ {
+ log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
+ chatRoomData = new ChatRoomData(clock, service, historyLimit);
+ this.chatRoomData[shard].put(chatRoomId, chatRoomData);
+ }
+
+ return chatRoomData;
+ }
+
+ ConsumerGroupMetadata getConsumerGroupMetadata()
+ {
+ return consumer.groupMetadata();
+ }