WIP
authorKai Moritz <kai@juplo.de>
Mon, 9 Jan 2023 22:10:03 +0000 (23:10 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 9 Jan 2023 22:10:03 +0000 (23:10 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomDao.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageDao.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomDao.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomDao.java
new file mode 100644 (file)
index 0000000..b83966e
--- /dev/null
@@ -0,0 +1,34 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.*;
+
+import java.util.List;
+import java.util.UUID;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "name" })
+public class ChatRoomDao
+{
+  private UUID id;
+  private String name;
+  private List<MessageDao> messages;
+
+
+  public static ChatRoomDao from(ChatRoom chatroom)
+  {
+    return new ChatRoomDao(
+        chatroom.getId(),
+        chatroom.getName(),
+        chatroom
+            .getMessages()
+            .map(MessageDao::from)
+            .collectList()
+            .block());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageDao.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageDao.java
new file mode 100644 (file)
index 0000000..472b21f
--- /dev/null
@@ -0,0 +1,38 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "user", "id" })
+@ToString(of = { "user", "id" })
+class MessageDao
+{
+  private Long id;
+  private Long serial;
+  private LocalDateTime time;
+  private String user;
+  private String text;
+
+  Message toMessage()
+  {
+    return new Message(Message.MessageKey.of(user, id), serial, time, text);
+  }
+
+  static MessageDao from(Message message)
+  {
+    return
+        new MessageDao(
+            message.getId(),
+            message.getSerialNumber(),
+            message.getTimestamp(),
+            message.getUsername(),
+            message.getMessageText());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageService.java
new file mode 100644 (file)
index 0000000..1124eff
--- /dev/null
@@ -0,0 +1,5 @@
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+public class MongoDbStorageService
+{
+}
index ccf3d69..200e323 100644 (file)
@@ -1,16 +1,62 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
+import de.juplo.kafka.chat.backend.api.ChatRoomTo;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import reactor.core.publisher.Flux;
 
+import java.io.IOException;
+
 
 public class MongoDbStorageStrategy implements StorageStrategy
 {
   @Override
   public void write(Flux<ChatRoom> chatroomFlux)
   {
-
+    chatroomFlux
+        .log()
+        .doFirst(() ->
+        {
+          try
+          {
+            generator.useDefaultPrettyPrinter();
+            generator.writeStartArray();
+          }
+          catch (IOException e)
+          {
+            throw new RuntimeException(e);
+          }
+        })
+        .doOnTerminate(() ->
+        {
+          try
+          {
+            generator.writeEndArray();
+            generator.close();
+          }
+          catch (IOException e)
+          {
+            throw new RuntimeException(e);
+          }
+        })
+        .subscribe(chatroom ->
+        {
+          try
+          {
+            ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
+            generator.writeObject(chatroomTo);
+            writeMessages(chatroomTo, chatroom.getMessages());
+          }
+          catch (IOException e)
+          {
+            throw new RuntimeException(e);
+          }
+        });
+  }
+    catch (IOException e)
+  {
+    throw new RuntimeException(e);
+  }
   }
 
   @Override