refactor: RED - Refined success/error-handling for restore-operations
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / SimpleChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.Clock;
11 import java.util.*;
12
13
14 @Slf4j
15 public class SimpleChatHomeService implements ChatHomeService
16 {
17   private final Integer shard;
18   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19   private final Map<UUID, ChatRoomData> chatRoomData;
20   private final Clock clock;
21   private final int bufferSize;
22
23
24
25   public SimpleChatHomeService(
26       StorageStrategy storageStrategy,
27       Clock clock,
28       int bufferSize)
29   {
30     this(
31         null,
32         storageStrategy,
33         clock,
34         bufferSize);
35   }
36
37   public SimpleChatHomeService(
38       Integer shard,
39       StorageStrategy storageStrategy,
40       Clock clock,
41       int bufferSize)
42   {
43     log.debug("Creating SimpleChatHomeService");
44
45     this.shard = shard;
46     this.chatRoomInfo = new HashMap<>();
47     this.chatRoomData = new HashMap<>();
48
49     storageStrategy
50         .readChatRoomInfo()
51         .filter(info ->
52         {
53           if (shard == null || info.getShard() == shard)
54           {
55             return true;
56           }
57           else
58           {
59             log.info(
60                 "SimpleChatHome for shard {} ignores not owned chat-room {}",
61                 shard,
62                 info);
63             return false;
64           }
65         })
66         .doOnNext(info ->
67         {
68           UUID chatRoomId = info.getId();
69           chatRoomInfo.put(chatRoomId, info);
70           Flux<Message> messageFlux =
71               storageStrategy.readChatRoomData(chatRoomId);
72           chatRoomData.put(
73               info.getId(),
74               new ChatRoomData(
75                   clock,
76                   new InMemoryChatMessageService(messageFlux),
77                   bufferSize));
78         })
79         .then()
80         .doOnSuccess(empty -> log.info("Restored {}", this))
81         .doOnError(throwable -> log.error("Could not restore {}", this))
82         .block();
83
84     this.clock = clock;
85     this.bufferSize = bufferSize;
86   }
87
88
89   @Override
90   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
91   {
92     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
93     ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
94     ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
95     this.chatRoomInfo.put(id, chatRoomInfo);
96     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
97     this.chatRoomData.put(id, chatRoomData);
98     return Mono.just(chatRoomInfo);
99   }
100
101   @Override
102   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
103   {
104     return Mono
105         .justOrEmpty(chatRoomInfo.get(id))
106         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
107   }
108
109   @Override
110   public Flux<ChatRoomInfo> getChatRoomInfo()
111   {
112     return Flux.fromIterable(chatRoomInfo.values());
113   }
114
115   @Override
116   public Mono<ChatRoomData> getChatRoomData(UUID id)
117   {
118     return Mono
119         .justOrEmpty(chatRoomData.get(id))
120         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
121   }
122
123   @Override
124   public Mono<String[]> getShardOwners()
125   {
126     return Mono.empty();
127   }
128
129   @Override
130   public String toString()
131   {
132     return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
133   }
134 }