feat: Prepared the application for sharding
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
5 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9
10 import java.time.Clock;
11 import java.util.UUID;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class MongoDbStorageStrategy implements StorageStrategy
17 {
18   private final ChatRoomRepository repository;
19   private final Clock clock;
20   private final int bufferSize;
21   private final ChatRoomServiceFactory factory;
22
23
24   @Override
25   public void write(Flux<ChatRoom> chatroomFlux)
26   {
27     chatroomFlux
28         .map(ChatRoomTo::from)
29         .subscribe(chatroomTo -> repository.save(chatroomTo));
30   }
31
32   @Override
33   public Flux<ChatRoom> read()
34   {
35     return Flux
36         .fromIterable(repository.findAll())
37         .map(chatRoomTo -> new ChatRoom(
38             UUID.fromString(chatRoomTo.getId()),
39             chatRoomTo.getName(),
40             chatRoomTo.getShard(),
41             clock,
42             factory.create(
43                 Flux
44                     .fromIterable(chatRoomTo.getMessages())
45                     .map(messageTo -> messageTo.toMessage())),
46             bufferSize));
47   }
48 }