feat: Prepared the application for sharding
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8
9 import java.util.HashMap;
10 import java.util.Map;
11 import java.util.UUID;
12
13
14 @Slf4j
15 public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
16 {
17   private final Map<UUID, ChatRoom>[] chatrooms;
18
19
20   public InMemoryChatHomeService(int numShards, Flux<ChatRoom> chatroomFlux)
21   {
22     log.debug("Creating InMemoryChatHomeService");
23     this.chatrooms = new Map[numShards];
24     for (int shard = 0; shard < numShards; shard++)
25         chatrooms[shard] = new HashMap<>();
26     chatroomFlux
27         .toStream()
28         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
29   }
30
31   @Override
32   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
33   {
34     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
35     return Mono.just(chatRoom);
36   }
37
38   @Override
39   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
40   {
41     return Mono.justOrEmpty(chatrooms[shard].get(id));
42   }
43
44   @Override
45   public Flux<ChatRoom> getChatRooms(int shard)
46   {
47     return Flux.fromStream(chatrooms[shard].values().stream());
48   }
49 }