FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / storage / mongodb / MongoDbStorageStrategy.java
1 package de.juplo.kafka.chat.backend.storage.mongodb;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9
10 import java.util.UUID;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class MongoDbStorageStrategy implements StorageStrategy
16 {
17   private final ChatRoomRepository chatRoomRepository;
18   private final MessageRepository messageRepository;
19
20
21   @Override
22   public Flux<ChatRoomInfo> writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
23   {
24     return chatRoomInfoFlux
25         .map(ChatRoomTo::from)
26         .flatMap(chatRoomTo -> chatRoomRepository.save(chatRoomTo))
27         .map(ChatRoomTo::toChatRoomInfo);
28   }
29
30   @Override
31   public Flux<ChatRoomInfo> readChatRoomInfo()
32   {
33     return chatRoomRepository
34         .findAll()
35         .map(chatRoomTo ->
36         {
37           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
38           return new ChatRoomInfo(chatRoomId, chatRoomTo.getName(), null);
39         });
40   }
41
42   @Override
43   public Flux<Message> writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
44   {
45     return messageFlux
46         .map(message -> MessageTo.from(chatRoomId, message))
47         .flatMap(messageTo -> messageRepository.save(messageTo))
48         .map(MessageTo::toMessage);
49   }
50
51   @Override
52   public Flux<Message> readChatRoomData(UUID chatRoomId)
53   {
54     return messageRepository
55         .findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())
56         .map(messageTo -> messageTo.toMessage());
57   }
58 }