5c3fe2e5e1d8c3ac60b9313dcb6c3f7dfc8ffe0e
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.Clock;
11 import java.util.*;
12
13
14 @Slf4j
15 public class SimpleChatHomeService implements ChatHomeService
16 {
17   private final Integer shard;
18   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19   private final Map<UUID, ChatRoomData> chatRoomData;
20   private final Clock clock;
21   private final int bufferSize;
22
23
24
25   public SimpleChatHomeService(
26       Clock clock,
27       int bufferSize)
28   {
29     this(
30         null,
31         clock,
32         bufferSize);
33   }
34
35   public SimpleChatHomeService(
36       Integer shard,
37       Clock clock,
38       int bufferSize)
39   {
40     log.debug("Creating SimpleChatHomeService");
41
42     this.shard = shard;
43     this.chatRoomInfo = new HashMap<>();
44     this.chatRoomData = new HashMap<>();
45     this.clock = clock;
46     this.bufferSize = bufferSize;
47   }
48
49
50   Mono<Void> restore(StorageStrategy storageStrategy)
51   {
52     return storageStrategy
53         .readChatRoomInfo()
54         .filter(info ->
55         {
56           if (shard == null || info.getShard() == shard)
57           {
58             return true;
59           }
60           else
61           {
62             log.info(
63                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
64                 shard,
65                 info);
66             return false;
67           }
68         })
69         .flatMap(info ->
70         {
71           UUID chatRoomId = info.getId();
72           InMemoryChatMessageService chatMessageService =
73               new InMemoryChatMessageService(chatRoomId);
74
75           chatRoomInfo.put(chatRoomId, info);
76           chatRoomData.put(
77               info.getId(),
78               new ChatRoomData(
79                   clock,
80                   chatMessageService,
81                   bufferSize));
82
83           return chatMessageService.restore(storageStrategy);
84         })
85         .then()
86         .doOnSuccess(empty -> log.info("Restored {}", this))
87         .doOnError(throwable -> log.error("Could not restore {}", this));
88   }
89
90
91   @Override
92   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
93   {
94     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
95     ChatMessageService service = new InMemoryChatMessageService(id);
96     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
97     this.chatRoomInfo.put(id, chatRoomInfo);
98     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
99     this.chatRoomData.put(id, chatRoomData);
100     return Mono.just(chatRoomInfo);
101   }
102
103   @Override
104   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
105   {
106     return Mono
107         .justOrEmpty(chatRoomInfo.get(id))
108         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
109   }
110
111   @Override
112   public Flux<ChatRoomInfo> getChatRoomInfo()
113   {
114     return Flux.fromIterable(chatRoomInfo.values());
115   }
116
117   @Override
118   public Mono<ChatRoomData> getChatRoomData(UUID id)
119   {
120     return Mono
121         .justOrEmpty(chatRoomData.get(id))
122         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
123   }
124
125   @Override
126   public Mono<String[]> getShardOwners()
127   {
128     return Mono.empty();
129   }
130
131   @Override
132   public String toString()
133   {
134     return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
135   }
136 }