1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
9 import java.util.stream.IntStream;
13 public class InMemoryChatHomeService
15 private final Map<UUID, ChatRoom>[] chatrooms;
18 public InMemoryChatHomeService(
21 Flux<ChatRoom> chatroomFlux)
23 log.debug("Creating InMemoryChatHomeService");
24 this.chatrooms = new Map[numShards];
25 Set<Integer> owned = Arrays
28 () -> new HashSet<>(),
29 (set, i) -> set.add(i),
30 (a, b) -> a.addAll(b));
31 for (int shard = 0; shard < numShards; shard++)
33 chatrooms[shard] = owned.contains(shard)
40 if (owned.contains(chatRoom.getShard()))
46 log.info("Ignoring not owned chat-room {}", chatRoom);
51 .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
54 public void putChatRoom(ChatRoom chatRoom)
56 chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
59 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
61 return Mono.justOrEmpty(chatrooms[shard].get(id));
64 public int[] getOwnedShards()
67 .range(0, chatrooms.length)
68 .filter(i -> chatrooms[i] != null)
72 public Flux<ChatRoom> getChatRooms(int shard)
74 return Flux.fromStream(chatrooms[shard].values().stream());