1 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
3 import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import reactor.core.publisher.Flux;
11 import java.time.Clock;
12 import java.util.UUID;
15 @RequiredArgsConstructor
17 public class MongoDbStorageStrategy implements StorageStrategy
19 private final ChatRoomRepository repository;
20 private final Clock clock;
21 private final int bufferSize;
22 private final ShardingStrategy shardingStrategy;
23 private final ChatRoomServiceFactory factory;
27 public void write(Flux<ChatRoom> chatroomFlux)
30 .map(ChatRoomTo::from)
31 .subscribe(chatroomTo -> repository.save(chatroomTo));
35 public Flux<ChatRoom> read()
38 .fromIterable(repository.findAll())
41 UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
42 int shard = shardingStrategy.selectShard(chatRoomId);
50 .fromIterable(chatRoomTo.getMessages())
51 .map(messageTo -> messageTo.toMessage())),