--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.*;
+
+import java.util.List;
+import java.util.UUID;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "name" })
+public class ChatRoomDao
+{
+ private UUID id;
+ private String name;
+ private List<MessageDao> messages;
+
+
+ public static ChatRoomDao from(ChatRoom chatroom)
+ {
+ return new ChatRoomDao(
+ chatroom.getId(),
+ chatroom.getName(),
+ chatroom
+ .getMessages()
+ .map(MessageDao::from)
+ .collectList()
+ .block());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter(AccessLevel.PACKAGE)
+@Setter(AccessLevel.PACKAGE)
+@EqualsAndHashCode(of = { "user", "id" })
+@ToString(of = { "user", "id" })
+class MessageDao
+{
+ private Long id;
+ private Long serial;
+ private LocalDateTime time;
+ private String user;
+ private String text;
+
+ Message toMessage()
+ {
+ return new Message(Message.MessageKey.of(user, id), serial, time, text);
+ }
+
+ static MessageDao from(Message message)
+ {
+ return
+ new MessageDao(
+ message.getId(),
+ message.getSerialNumber(),
+ message.getTimestamp(),
+ message.getUsername(),
+ message.getMessageText());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+public class MongoDbStorageService
+{
+}
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import reactor.core.publisher.Flux;
+import java.io.IOException;
+
public class MongoDbStorageStrategy implements StorageStrategy
{
@Override
public void write(Flux<ChatRoom> chatroomFlux)
{
-
+ chatroomFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(chatroom ->
+ {
+ try
+ {
+ ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
+ generator.writeObject(chatroomTo);
+ writeMessages(chatroomTo, chatroom.getMessages());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
@Override