8e3cc4305b3df480059eb4d934996f8c36f4bd7e
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.Clock;
11 import java.util.*;
12
13
14 @Slf4j
15 public class SimpleChatHomeService implements ChatHomeService
16 {
17   private final Integer shard;
18   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19   private final Map<UUID, ChatRoomData> chatRoomData;
20   private final Clock clock;
21   private final int historyLimit;
22
23
24
25   public SimpleChatHomeService(
26       Clock clock,
27       int historyLimit)
28   {
29     this(
30         null,
31         clock,
32         historyLimit);
33   }
34
35   public SimpleChatHomeService(
36       Integer shard,
37       Clock clock,
38       int historyLimit)
39   {
40     log.debug("Creating SimpleChatHomeService");
41
42     this.shard = shard;
43     this.chatRoomInfo = new HashMap<>();
44     this.chatRoomData = new HashMap<>();
45     this.clock = clock;
46     this.historyLimit = historyLimit;
47   }
48
49
50   Mono<Void> restore(StorageStrategy storageStrategy)
51   {
52     chatRoomInfo.clear();
53     chatRoomData.clear();
54
55     return storageStrategy
56         .readChatRoomInfo()
57         .filter(info ->
58         {
59           if (shard == null || info.getShard() == shard)
60           {
61             return true;
62           }
63           else
64           {
65             log.info(
66                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
67                 shard,
68                 info);
69             return false;
70           }
71         })
72         .flatMap(info ->
73         {
74           UUID chatRoomId = info.getId();
75           InMemoryChatMessageService chatMessageService =
76               new InMemoryChatMessageService(chatRoomId);
77
78           chatRoomInfo.put(chatRoomId, info);
79           ChatRoomData chatRoomData =
80               new ChatRoomData(
81                   clock,
82                   chatMessageService,
83                   historyLimit);
84           chatRoomData.activate();
85           this.chatRoomData.put(info.getId(), chatRoomData);
86
87           return chatMessageService.restore(storageStrategy);
88         })
89         .count()
90         .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
91         .doOnError(throwable -> log.error("Could not restore {}", this))
92         .then();
93   }
94
95
96   @Override
97   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
98   {
99     log.info("Creating ChatRoom with history-limit {}", historyLimit);
100     ChatMessageService service = new InMemoryChatMessageService(id);
101     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
102     this.chatRoomInfo.put(id, chatRoomInfo);
103     ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
104     chatRoomData.activate();
105     this.chatRoomData.put(id, chatRoomData);
106     return Mono.just(chatRoomInfo);
107   }
108
109   @Override
110   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
111   {
112     return Mono
113         .justOrEmpty(chatRoomInfo.get(id))
114         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
115   }
116
117   @Override
118   public Flux<ChatRoomInfo> getChatRoomInfo()
119   {
120     return Flux.fromIterable(chatRoomInfo.values());
121   }
122
123   @Override
124   public Mono<ChatRoomData> getChatRoomData(UUID id)
125   {
126     return Mono
127         .justOrEmpty(chatRoomData.get(id))
128         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
129   }
130
131   @Override
132   public Mono<String[]> getShardOwners()
133   {
134     return Mono.empty();
135   }
136
137   @Override
138   public String toString()
139   {
140     return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
141   }
142 }