371d4a874f221cf9a0de810e531af14a83f5215e
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.Clock;
11 import java.util.*;
12
13
14 @Slf4j
15 public class SimpleChatHomeService implements ChatHomeService
16 {
17   private final Integer shard;
18   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19   private final Map<UUID, ChatRoomData> chatRoomData;
20   private final Clock clock;
21   private final int bufferSize;
22
23
24
25   public SimpleChatHomeService(
26       Clock clock,
27       int bufferSize)
28   {
29     this(
30         null,
31         clock,
32         bufferSize);
33   }
34
35   public SimpleChatHomeService(
36       Integer shard,
37       Clock clock,
38       int bufferSize)
39   {
40     log.debug("Creating SimpleChatHomeService");
41
42     this.shard = shard;
43     this.chatRoomInfo = new HashMap<>();
44     this.chatRoomData = new HashMap<>();
45     this.clock = clock;
46     this.bufferSize = bufferSize;
47   }
48
49
50   Mono<Void> restore(StorageStrategy storageStrategy)
51   {
52     chatRoomInfo.clear();
53     chatRoomData.clear();
54
55     return storageStrategy
56         .readChatRoomInfo()
57         .filter(info ->
58         {
59           if (shard == null || info.getShard() == shard)
60           {
61             return true;
62           }
63           else
64           {
65             log.info(
66                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
67                 shard,
68                 info);
69             return false;
70           }
71         })
72         .flatMap(info ->
73         {
74           UUID chatRoomId = info.getId();
75           InMemoryChatMessageService chatMessageService =
76               new InMemoryChatMessageService(chatRoomId);
77
78           chatRoomInfo.put(chatRoomId, info);
79           chatRoomData.put(
80               info.getId(),
81               new ChatRoomData(
82                   clock,
83                   chatMessageService,
84                   bufferSize));
85
86           return chatMessageService.restore(storageStrategy);
87         })
88         .then()
89         .doOnSuccess(empty -> log.info("Restored {}", this))
90         .doOnError(throwable -> log.error("Could not restore {}", this));
91   }
92
93
94   @Override
95   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
96   {
97     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
98     ChatMessageService service = new InMemoryChatMessageService(id);
99     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
100     this.chatRoomInfo.put(id, chatRoomInfo);
101     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
102     this.chatRoomData.put(id, chatRoomData);
103     return Mono.just(chatRoomInfo);
104   }
105
106   @Override
107   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
108   {
109     return Mono
110         .justOrEmpty(chatRoomInfo.get(id))
111         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
112   }
113
114   @Override
115   public Flux<ChatRoomInfo> getChatRoomInfo()
116   {
117     return Flux.fromIterable(chatRoomInfo.values());
118   }
119
120   @Override
121   public Mono<ChatRoomData> getChatRoomData(UUID id)
122   {
123     return Mono
124         .justOrEmpty(chatRoomData.get(id))
125         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
126   }
127
128   @Override
129   public Mono<String[]> getShardOwners()
130   {
131     return Mono.empty();
132   }
133
134   @Override
135   public String toString()
136   {
137     return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
138   }
139 }