refactor: Removed Interface `ChatHomeService`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7
8 import java.util.*;
9
10
11 @Slf4j
12 public class InMemoryChatHomeService
13 {
14   private final Map<UUID, ChatRoom>[] chatrooms;
15
16
17   public InMemoryChatHomeService(
18       int numShards,
19       int[] ownedShards,
20       Flux<ChatRoom> chatroomFlux)
21   {
22     log.debug("Creating InMemoryChatHomeService");
23     this.chatrooms = new Map[numShards];
24     Set<Integer> owned = Arrays
25         .stream(ownedShards)
26         .collect(
27             () -> new HashSet<>(),
28             (set, i) -> set.add(i),
29             (a, b) -> a.addAll(b));
30     for (int shard = 0; shard < numShards; shard++)
31     {
32       chatrooms[shard] = owned.contains(shard)
33           ? new HashMap<>()
34           : null;
35     }
36     chatroomFlux
37         .filter(chatRoom ->
38         {
39           if (owned.contains(chatRoom.getShard()))
40           {
41             return true;
42           }
43           else
44           {
45             log.info("Ignoring not owned chat-room {}", chatRoom);
46             return false;
47           }
48         })
49         .toStream()
50         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
51   }
52
53   public void putChatRoom(ChatRoom chatRoom)
54   {
55     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
56   }
57
58   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
59   {
60     return Mono.justOrEmpty(chatrooms[shard].get(id));
61   }
62
63   public Flux<ChatRoom> getChatRooms(int shard)
64   {
65     return Flux.fromStream(chatrooms[shard].values().stream());
66   }
67 }