From ac8d83c339a5a4e966099822c7a08d68f573480f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 9 Jan 2023 23:10:03 +0100 Subject: [PATCH] WIP --- .../storage/mongodb/ChatRoomDao.java | 34 +++++++++++++ .../storage/mongodb/MessageDao.java | 38 +++++++++++++++ .../mongodb/MongoDbStorageService.java | 5 ++ .../mongodb/MongoDbStorageStrategy.java | 48 ++++++++++++++++++- 4 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomDao.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageDao.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageService.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomDao.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomDao.java new file mode 100644 index 00000000..b83966e5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomDao.java @@ -0,0 +1,34 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import lombok.*; + +import java.util.List; +import java.util.UUID; + + +@AllArgsConstructor +@NoArgsConstructor +@Getter(AccessLevel.PACKAGE) +@Setter(AccessLevel.PACKAGE) +@EqualsAndHashCode(of = { "id" }) +@ToString(of = { "id", "name" }) +public class ChatRoomDao +{ + private UUID id; + private String name; + private List messages; + + + public static ChatRoomDao from(ChatRoom chatroom) + { + return new ChatRoomDao( + chatroom.getId(), + chatroom.getName(), + chatroom + .getMessages() + .map(MessageDao::from) + .collectList() + .block()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageDao.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageDao.java new file mode 100644 index 00000000..472b21fa --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageDao.java @@ -0,0 +1,38 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.*; + +import java.time.LocalDateTime; + + +@AllArgsConstructor +@NoArgsConstructor +@Getter(AccessLevel.PACKAGE) +@Setter(AccessLevel.PACKAGE) +@EqualsAndHashCode(of = { "user", "id" }) +@ToString(of = { "user", "id" }) +class MessageDao +{ + private Long id; + private Long serial; + private LocalDateTime time; + private String user; + private String text; + + Message toMessage() + { + return new Message(Message.MessageKey.of(user, id), serial, time, text); + } + + static MessageDao from(Message message) + { + return + new MessageDao( + message.getId(), + message.getSerialNumber(), + message.getTimestamp(), + message.getUsername(), + message.getMessageText()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageService.java new file mode 100644 index 00000000..1124eff6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageService.java @@ -0,0 +1,5 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +public class MongoDbStorageService +{ +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index ccf3d696..200e323f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,16 +1,62 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; +import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import reactor.core.publisher.Flux; +import java.io.IOException; + public class MongoDbStorageStrategy implements StorageStrategy { @Override public void write(Flux chatroomFlux) { - + chatroomFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(chatroom -> + { + try + { + ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); + generator.writeObject(chatroomTo); + writeMessages(chatroomTo, chatroom.getMessages()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + catch (IOException e) + { + throw new RuntimeException(e); + } } @Override -- 2.20.1