2aac0fac92bc26a3c248f0229cacbfcd56271475
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.Clock;
11 import java.util.*;
12
13
14 @Slf4j
15 public class SimpleChatHomeService implements ChatHomeService
16 {
17   private final Integer shard;
18   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19   private final Map<UUID, ChatRoomData> chatRoomData;
20   private final Clock clock;
21   private final int historyLimit;
22
23
24
25   public SimpleChatHomeService(
26       Clock clock,
27       int historyLimit)
28   {
29     this(
30         null,
31         clock,
32         historyLimit);
33   }
34
35   public SimpleChatHomeService(
36       Integer shard,
37       Clock clock,
38       int historyLimit)
39   {
40     log.debug("Creating SimpleChatHomeService");
41
42     this.shard = shard;
43     this.chatRoomInfo = new HashMap<>();
44     this.chatRoomData = new HashMap<>();
45     this.clock = clock;
46     this.historyLimit = historyLimit;
47   }
48
49
50   Mono<Void> restore(StorageStrategy storageStrategy)
51   {
52     chatRoomInfo.clear();
53     chatRoomData.clear();
54
55     return storageStrategy
56         .readChatRoomInfo()
57         .filter(info ->
58         {
59           if (shard == null || info.getShard() == shard)
60           {
61             return true;
62           }
63           else
64           {
65             log.info(
66                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
67                 shard,
68                 info);
69             return false;
70           }
71         })
72         .flatMap(info ->
73         {
74           UUID chatRoomId = info.getId();
75           InMemoryChatMessageService chatMessageService =
76               new InMemoryChatMessageService(chatRoomId);
77
78           chatRoomInfo.put(chatRoomId, info);
79           chatRoomData.put(
80               info.getId(),
81               new ChatRoomData(
82                   clock,
83                   chatMessageService,
84                   historyLimit));
85
86           return chatMessageService.restore(storageStrategy);
87         })
88         .count()
89         .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
90         .doOnError(throwable -> log.error("Could not restore {}", this))
91         .then();
92   }
93
94
95   @Override
96   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
97   {
98     log.info("Creating ChatRoom with history-limit {}", historyLimit);
99     ChatMessageService service = new InMemoryChatMessageService(id);
100     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
101     this.chatRoomInfo.put(id, chatRoomInfo);
102     ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
103     this.chatRoomData.put(id, chatRoomData);
104     return Mono.just(chatRoomInfo);
105   }
106
107   @Override
108   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
109   {
110     return Mono
111         .justOrEmpty(chatRoomInfo.get(id))
112         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
113   }
114
115   @Override
116   public Flux<ChatRoomInfo> getChatRoomInfo()
117   {
118     return Flux.fromIterable(chatRoomInfo.values());
119   }
120
121   @Override
122   public Mono<ChatRoomData> getChatRoomData(UUID id)
123   {
124     return Mono
125         .justOrEmpty(chatRoomData.get(id))
126         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
127   }
128
129   @Override
130   public Mono<String[]> getShardOwners()
131   {
132     return Mono.empty();
133   }
134
135   @Override
136   public String toString()
137   {
138     return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
139   }
140 }