refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / SimpleChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.Clock;
11 import java.util.*;
12
13
14 @Slf4j
15 public class SimpleChatHome implements ChatHome
16 {
17   private final Integer shard;
18   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19   private final Map<UUID, ChatRoomData> chatRoomData;
20   private final Clock clock;
21   private final int bufferSize;
22
23
24
25   public SimpleChatHome(
26       StorageStrategy storageStrategy,
27       Clock clock,
28       int bufferSize)
29   {
30     this(
31         null,
32         storageStrategy,
33         clock,
34         bufferSize);
35   }
36
37   public SimpleChatHome(
38       Integer shard,
39       StorageStrategy storageStrategy,
40       Clock clock,
41       int bufferSize)
42   {
43     log.info("Created SimpleChatHome for shard {}", shard);
44 ;
45     this.shard = shard;
46     this.chatRoomInfo = new HashMap<>();
47     this.chatRoomData = new HashMap<>();
48     storageStrategy
49         .readChatRoomInfo()
50         .filter(info ->
51         {
52           if (shard == null || info.getShard() == shard)
53           {
54             return true;
55           }
56           else
57           {
58             log.info(
59                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
60                 shard,
61                 info);
62             return false;
63           }
64         })
65         .toStream()
66         .forEach(info ->
67         {
68           UUID chatRoomId = info.getId();
69           chatRoomInfo.put(chatRoomId, info);
70           Flux<Message> messageFlux =
71               storageStrategy.readChatRoomData(chatRoomId);
72           chatRoomData.put(
73               info.getId(),
74               new ChatRoomData(
75                   clock,
76                   new InMemoryChatRoomService(messageFlux),
77                   bufferSize));
78         });
79     this.clock = clock;
80     this.bufferSize = bufferSize;
81   }
82
83
84   @Override
85   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
86   {
87     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
88     ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
89     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
90     this.chatRoomInfo.put(id, chatRoomInfo);
91     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
92     this.chatRoomData.put(id, chatRoomData);
93     return Mono.just(chatRoomInfo);
94   }
95
96   @Override
97   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
98   {
99     return Mono
100         .justOrEmpty(chatRoomInfo.get(id))
101         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
102   }
103
104   @Override
105   public Flux<ChatRoomInfo> getChatRoomInfo()
106   {
107     return Flux.fromIterable(chatRoomInfo.values());
108   }
109
110   @Override
111   public Mono<ChatRoomData> getChatRoomData(UUID id)
112   {
113     return Mono
114         .justOrEmpty(chatRoomData.get(id))
115         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
116   }
117
118   public Flux<ChatRoomData> getChatRoomData()
119   {
120     return Flux.fromIterable(chatRoomData.values());
121   }
122 }