UUID chatRoomId = UUID.fromString(record.key());
MessageTo messageTo = record.value();
ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
- Mono<Message> result = chatRoom.addMessage(
- messageTo.getId(),
- messageTo.getUser(),
- messageTo.getText());
- result.block().
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
+ Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+ kafkaChatRoomService.persistMessage(message);
}
}
else