refactor: One stream -> using `flatMap` instead of an inner `subscribe`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.Clock;
11 import java.util.*;
12
13
14 @Slf4j
15 public class SimpleChatHomeService implements ChatHomeService
16 {
17   private final Integer shard;
18   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19   private final Map<UUID, ChatRoomData> chatRoomData;
20   private final Clock clock;
21   private final int bufferSize;
22
23
24
25   public SimpleChatHomeService(
26       StorageStrategy storageStrategy,
27       Clock clock,
28       int bufferSize)
29   {
30     this(
31         null,
32         storageStrategy,
33         clock,
34         bufferSize);
35   }
36
37   public SimpleChatHomeService(
38       Integer shard,
39       StorageStrategy storageStrategy,
40       Clock clock,
41       int bufferSize)
42   {
43 ;
44     this.shard = shard;
45     log.info("Created {}", this);
46
47     this.chatRoomInfo = new HashMap<>();
48     this.chatRoomData = new HashMap<>();
49     storageStrategy
50         .readChatRoomInfo()
51         .filter(info ->
52         {
53           if (shard == null || info.getShard() == shard)
54           {
55             return true;
56           }
57           else
58           {
59             log.info(
60                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
61                 shard,
62                 info);
63             return false;
64           }
65         })
66         .toStream()
67         .forEach(info ->
68         {
69           UUID chatRoomId = info.getId();
70           chatRoomInfo.put(chatRoomId, info);
71           Flux<Message> messageFlux =
72               storageStrategy.readChatRoomData(chatRoomId);
73           chatRoomData.put(
74               info.getId(),
75               new ChatRoomData(
76                   clock,
77                   new InMemoryChatMessageService(messageFlux),
78                   bufferSize));
79         });
80     this.clock = clock;
81     this.bufferSize = bufferSize;
82   }
83
84
85   @Override
86   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
87   {
88     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
89     ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
90     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
91     this.chatRoomInfo.put(id, chatRoomInfo);
92     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
93     this.chatRoomData.put(id, chatRoomData);
94     return Mono.just(chatRoomInfo);
95   }
96
97   @Override
98   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
99   {
100     return Mono
101         .justOrEmpty(chatRoomInfo.get(id))
102         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
103   }
104
105   @Override
106   public Flux<ChatRoomInfo> getChatRoomInfo()
107   {
108     return Flux.fromIterable(chatRoomInfo.values());
109   }
110
111   @Override
112   public Mono<ChatRoomData> getChatRoomData(UUID id)
113   {
114     return Mono
115         .justOrEmpty(chatRoomData.get(id))
116         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
117   }
118
119   @Override
120   public Mono<String[]> getShardOwners()
121   {
122     return Mono.empty();
123   }
124
125   @Override
126   public String toString()
127   {
128     return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
129   }
130 }