NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index 5133d1a..f105902 100644 (file)
@@ -137,11 +137,13 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
             UUID chatRoomId = UUID.fromString(record.key());
             MessageTo messageTo = record.value();
             ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
-            Mono<Message> result = chatRoom.addMessage(
-                messageTo.getId(),
-                messageTo.getUser(),
-                messageTo.getText());
-            result.block().
+            KafkaChatRoomService kafkaChatRoomService =
+                (KafkaChatRoomService) chatRoom.getChatRoomService();
+            Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+            Instant instant = Instant.ofEpochSecond(record.timestamp());
+            LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+            Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+            kafkaChatRoomService.persistMessage(message);
           }
         }
         else