1 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
5 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
10 import java.time.Clock;
11 import java.util.UUID;
14 @RequiredArgsConstructor
16 public class MongoDbStorageStrategy implements StorageStrategy
18 private final ChatRoomRepository repository;
19 private final Clock clock;
20 private final int bufferSize;
21 private final ChatRoomServiceFactory factory;
25 public void write(Flux<ChatRoom> chatroomFlux)
28 .map(ChatRoomTo::from)
29 .subscribe(chatroomTo -> repository.save(chatroomTo));
33 public Flux<ChatRoom> read()
36 .fromIterable(repository.findAll())
39 UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
46 .fromIterable(chatRoomTo.getMessages())
47 .map(messageTo -> messageTo.toMessage())),