1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
10 import java.time.Clock;
15 public class SimpleChatHomeService implements ChatHomeService
17 private final Integer shard;
18 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19 private final Map<UUID, ChatRoomData> chatRoomData;
20 private final Clock clock;
21 private final int historyLimit;
25 public SimpleChatHomeService(
35 public SimpleChatHomeService(
40 log.debug("Creating SimpleChatHomeService");
43 this.chatRoomInfo = new HashMap<>();
44 this.chatRoomData = new HashMap<>();
46 this.historyLimit = historyLimit;
50 Mono<Void> restore(StorageStrategy storageStrategy)
55 return storageStrategy
59 if (shard == null || info.getShard() == shard)
66 "SimpleChatHome for shard {} ignores not owned chat-room {}",
74 UUID chatRoomId = info.getId();
75 InMemoryChatMessageService chatMessageService =
76 new InMemoryChatMessageService(chatRoomId);
78 chatRoomInfo.put(chatRoomId, info);
79 ChatRoomData chatRoomData =
84 chatRoomData.activate();
85 this.chatRoomData.put(info.getId(), chatRoomData);
87 return chatMessageService.restore(storageStrategy);
90 .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count))
91 .doOnError(throwable -> log.error("Could not restore {}", this))
97 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
99 log.info("Creating ChatRoom with history-limit {}", historyLimit);
100 ChatMessageService service = new InMemoryChatMessageService(id);
101 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
102 this.chatRoomInfo.put(id, chatRoomInfo);
103 ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
104 chatRoomData.activate();
105 this.chatRoomData.put(id, chatRoomData);
106 return Mono.just(chatRoomInfo);
110 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
113 .justOrEmpty(chatRoomInfo.get(id))
114 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
118 public Flux<ChatRoomInfo> getChatRoomInfo()
120 return Flux.fromIterable(chatRoomInfo.values());
124 public Mono<ChatRoomData> getChatRoomData(UUID id)
127 .justOrEmpty(chatRoomData.get(id))
128 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
132 public Mono<String[]> getShardOwners()
138 public String toString()
140 return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;