230021987eaf678b8588ef33a0b028834dfda616
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
5 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9
10 import java.time.Clock;
11 import java.util.UUID;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class MongoDbStorageStrategy implements StorageStrategy
17 {
18   private final ChatRoomRepository repository;
19   private final Clock clock;
20   private final int bufferSize;
21   private final ChatRoomServiceFactory factory;
22
23
24   @Override
25   public void write(Flux<ChatRoom> chatroomFlux)
26   {
27     chatroomFlux
28         .map(ChatRoomTo::from)
29         .subscribe(chatroomTo -> repository.save(chatroomTo));
30   }
31
32   @Override
33   public Flux<ChatRoom> read()
34   {
35     return Flux
36         .fromIterable(repository.findAll())
37         .map(chatRoomTo ->
38         {
39           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
40           return new ChatRoom(
41               chatRoomId,
42               chatRoomTo.getName(),
43               clock,
44               factory.create(
45                   Flux
46                       .fromIterable(chatRoomTo.getMessages())
47                       .map(messageTo -> messageTo.toMessage())),
48               bufferSize);
49         });
50   }
51 }