1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
8 import java.time.Clock;
13 public class SimpleChatHome implements ChatHome
15 private final Integer shard;
16 private final Map<UUID, ChatRoom> chatRooms;
17 private final Clock clock;
18 private final int bufferSize;
22 public SimpleChatHome(
23 Flux<ChatRoom> chatroomFlux,
27 this(null, chatroomFlux, clock, bufferSize);
30 public SimpleChatHome(
32 Flux<ChatRoom> chatroomFlux,
36 log.info("Created SimpleChatHome for shard {}", shard);
39 this.chatRooms = new HashMap<>();
43 if (shard == null || chatRoom.getShard() == shard)
50 "SimpleChatHome for shard {} ignores not owned chat-room {}",
57 .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
59 this.bufferSize = bufferSize;
64 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
66 log.info("Creating ChatRoom with buffer-size {}", bufferSize);
67 ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
68 ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
69 chatRooms.put(id, chatRoom);
70 return Mono.just(chatRoom);
74 public Mono<ChatRoom> getChatRoom(UUID id)
77 .justOrEmpty(chatRooms.get(id))
78 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
82 public Flux<ChatRoom> getChatRooms()
84 return Flux.fromIterable(chatRooms.values());