1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
10 import java.time.Clock;
15 public class SimpleChatHomeService implements ChatHomeService
17 private final Integer shard;
18 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
19 private final Map<UUID, ChatRoomData> chatRoomData;
20 private final Clock clock;
21 private final int bufferSize;
25 public SimpleChatHomeService(
26 StorageStrategy storageStrategy,
37 public SimpleChatHomeService(
39 StorageStrategy storageStrategy,
45 log.info("Created {}", this);
47 this.chatRoomInfo = new HashMap<>();
48 this.chatRoomData = new HashMap<>();
53 if (shard == null || info.getShard() == shard)
60 "SimpleChatHome for shard {} ignores not owned chat-room {}",
69 UUID chatRoomId = info.getId();
70 chatRoomInfo.put(chatRoomId, info);
71 Flux<Message> messageFlux =
72 storageStrategy.readChatRoomData(chatRoomId);
77 new InMemoryChatMessageService(messageFlux),
81 this.bufferSize = bufferSize;
86 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
88 log.info("Creating ChatRoom with buffer-size {}", bufferSize);
89 ChatMessageService service = new InMemoryChatMessageService(Flux.empty());
90 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
91 this.chatRoomInfo.put(id, chatRoomInfo);
92 ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
93 this.chatRoomData.put(id, chatRoomData);
94 return Mono.just(chatRoomInfo);
98 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
101 .justOrEmpty(chatRoomInfo.get(id))
102 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
106 public Flux<ChatRoomInfo> getChatRoomInfo()
108 return Flux.fromIterable(chatRoomInfo.values());
112 public Mono<ChatRoomData> getChatRoomData(UUID id)
115 .justOrEmpty(chatRoomData.get(id))
116 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
120 public Mono<String[]> getShardOwners()
126 public String toString()
128 return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;