TMP:Holzweg so, Refaktorisierung nötig
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / storage / mongodb / MongoDbStorageStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
2
3 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import reactor.core.publisher.Flux;
10
11 import java.time.Clock;
12 import java.util.UUID;
13
14
15 @RequiredArgsConstructor
16 @Slf4j
17 public class MongoDbStorageStrategy implements StorageStrategy
18 {
19   private final ChatRoomRepository repository;
20   private final Clock clock;
21   private final int bufferSize;
22   private final ShardingStrategy shardingStrategy;
23   private final ChatRoomServiceFactory factory;
24
25
26   @Override
27   public void write(Flux<ChatRoomInfo> chatroomFlux)
28   {
29     chatroomFlux
30         .map(ChatRoomTo::from)
31         .subscribe(chatroomTo -> repository.save(chatroomTo));
32   }
33
34   @Override
35   public Flux<ChatRoom> read()
36   {
37     return Flux
38         .fromIterable(repository.findAll())
39         .map(chatRoomTo ->
40         {
41           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
42           int shard = shardingStrategy.selectShard(chatRoomId);
43           return new ChatRoom(
44               chatRoomId,
45               chatRoomTo.getName(),
46               shard,
47               clock,
48               factory.create(
49                   Flux
50                       .fromIterable(chatRoomTo.getMessages())
51                       .map(messageTo -> messageTo.toMessage())),
52               bufferSize);
53         });
54   }
55 }