1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
9 import java.util.HashMap;
11 import java.util.UUID;
15 public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
17 private final Map<UUID, ChatRoom>[] chatrooms;
20 public InMemoryChatHomeService(int numShards, Flux<ChatRoom> chatroomFlux)
22 log.debug("Creating InMemoryChatHomeService");
23 this.chatrooms = new Map[numShards];
24 for (int shard = 0; shard < numShards; shard++)
25 chatrooms[shard] = new HashMap<>();
28 .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
32 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
34 chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
35 return Mono.just(chatRoom);
39 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
41 return Mono.justOrEmpty(chatrooms[shard].get(id));
45 public Flux<ChatRoom> getChatRooms(int shard)
47 return Flux.fromStream(chatrooms[shard].values().stream());