1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
9 import java.time.Clock;
14 public class SimpleChatHome implements ChatHome
16 private final Integer shard;
17 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
18 private final Map<UUID, ChatRoomData> chatRoomData;
19 private final Clock clock;
20 private final int bufferSize;
24 public SimpleChatHome(
25 StorageStrategy storageStrategy,
36 public SimpleChatHome(
38 StorageStrategy storageStrategy,
42 log.info("Created SimpleChatHome for shard {}", shard);
45 this.chatRoomInfo = new HashMap<>();
46 this.chatRoomData = new HashMap<>();
51 if (shard == null || info.getShard() == shard)
58 "SimpleChatHome for shard {} ignores not owned chat-room {}",
67 UUID chatRoomId = info.getId();
68 chatRoomInfo.put(chatRoomId, info);
69 Flux<Message> messageFlux =
70 storageStrategy.readChatRoomData(chatRoomId);
75 new InMemoryChatRoomService(messageFlux),
79 this.bufferSize = bufferSize;
84 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
86 log.info("Creating ChatRoom with buffer-size {}", bufferSize);
87 ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
88 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
89 this.chatRoomInfo.put(id, chatRoomInfo);
90 ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
91 this.chatRoomData.put(id, chatRoomData);
92 return Mono.just(chatRoomInfo);
96 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
99 .justOrEmpty(chatRoomInfo.get(id))
100 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
104 public Flux<ChatRoomInfo> getChatRoomInfo()
106 return Flux.fromIterable(chatRoomInfo.values());
110 public Mono<ChatRoomData> getChatRoomData(UUID id)
113 .justOrEmpty(chatRoomData.get(id))
114 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
117 public Flux<ChatRoomData> getChatRoomData()
119 return Flux.fromIterable(chatRoomData.values());