refactor: Splitted `ChatRoomInfo` and `ChatRoomData` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / SimpleChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8
9 import java.time.Clock;
10 import java.util.*;
11
12
13 @Slf4j
14 public class SimpleChatHome implements ChatHome
15 {
16   private final Integer shard;
17   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
18   private final Map<UUID, ChatRoomData> chatRoomData;
19   private final Clock clock;
20   private final int bufferSize;
21
22
23
24   public SimpleChatHome(
25       StorageStrategy storageStrategy,
26       Clock clock,
27       int bufferSize)
28   {
29     this(
30         null,
31         storageStrategy,
32         clock,
33         bufferSize);
34   }
35
36   public SimpleChatHome(
37       Integer shard,
38       StorageStrategy storageStrategy,
39       Clock clock,
40       int bufferSize)
41   {
42     log.info("Created SimpleChatHome for shard {}", shard);
43 ;
44     this.shard = shard;
45     this.chatRoomInfo = new HashMap<>();
46     this.chatRoomData = new HashMap<>();
47     storageStrategy
48         .readChatRoomInfo()
49         .filter(info ->
50         {
51           if (shard == null || info.getShard() == shard)
52           {
53             return true;
54           }
55           else
56           {
57             log.info(
58                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
59                 shard,
60                 info);
61             return false;
62           }
63         })
64         .toStream()
65         .forEach(info ->
66         {
67           UUID chatRoomId = info.getId();
68           chatRoomInfo.put(chatRoomId, info);
69           Flux<Message> messageFlux =
70               storageStrategy.readChatRoomData(chatRoomId);
71           chatRoomData.put(
72               info.getId(),
73               new ChatRoomData(
74                   clock,
75                   new InMemoryChatRoomService(messageFlux),
76                   bufferSize));
77         });
78     this.clock = clock;
79     this.bufferSize = bufferSize;
80   }
81
82
83   @Override
84   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
85   {
86     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
87     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
88     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
89     this.chatRoomInfo.put(id, chatRoomInfo);
90     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
91     this.chatRoomData.put(id, chatRoomData);
92     return Mono.just(chatRoomInfo);
93   }
94
95   @Override
96   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
97   {
98     return Mono
99         .justOrEmpty(chatRoomInfo.get(id))
100         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
101   }
102
103   @Override
104   public Flux<ChatRoomInfo> getChatRoomInfo()
105   {
106     return Flux.fromIterable(chatRoomInfo.values());
107   }
108
109   @Override
110   public Mono<ChatRoomData> getChatRoomData(UUID id)
111   {
112     return Mono
113         .justOrEmpty(chatRoomData.get(id))
114         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
115   }
116
117   public Flux<ChatRoomData> getChatRoomData()
118   {
119     return Flux.fromIterable(chatRoomData.values());
120   }
121 }