- private void loadChatMessage(
- UUID chatRoomId,
- LocalDateTime timestamp,
- long offset,
- EventChatMessageReceivedTo chatMessageTo,
- int partition)
- {
- Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
- Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
-
- ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
- KafkaChatMessageService kafkaChatRoomService =
- (KafkaChatMessageService) chatRoomData.getChatRoomService();
-
- kafkaChatRoomService.persistMessage(message);
- }
-
- private boolean isLoadingCompleted()
- {
- return IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
- }
-
- private void pauseAllOwnedPartions()
- {
- consumer.pause(IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> new TopicPartition(topic, shard))
- .toList());
- }
-
-
- private void putChatRoom(
- UUID chatRoomId,
- String name,
- Integer partition,
- ChatRoomData chatRoomData)
- {
- if (this.chatRoomInfo[partition].containsKey(chatRoomId))