WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7
8 import java.util.*;
9 import java.util.stream.IntStream;
10
11
12 @Slf4j
13 public class InMemoryChatHomeService
14 {
15   private final Map<UUID, ChatRoom>[] chatrooms;
16
17
18   public InMemoryChatHomeService(
19       int numShards,
20       int[] ownedShards,
21       Flux<ChatRoom> chatroomFlux)
22   {
23     log.debug("Creating InMemoryChatHomeService");
24     this.chatrooms = new Map[numShards];
25     Set<Integer> owned = Arrays
26         .stream(ownedShards)
27         .collect(
28             () -> new HashSet<>(),
29             (set, i) -> set.add(i),
30             (a, b) -> a.addAll(b));
31     for (int shard = 0; shard < numShards; shard++)
32     {
33       chatrooms[shard] = owned.contains(shard)
34           ? new HashMap<>()
35           : null;
36     }
37     chatroomFlux
38         .filter(chatRoom ->
39         {
40           if (owned.contains(chatRoom.getShard()))
41           {
42             return true;
43           }
44           else
45           {
46             log.info("Ignoring not owned chat-room {}", chatRoom);
47             return false;
48           }
49         })
50         .toStream()
51         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
52   }
53
54   public void putChatRoom(ChatRoom chatRoom)
55   {
56     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
57   }
58
59   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
60   {
61     return Mono.justOrEmpty(chatrooms[shard].get(id));
62   }
63
64   public int[] getOwnedShards()
65   {
66     return IntStream
67         .range(0, chatrooms.length)
68         .filter(i -> chatrooms[i] != null)
69         .toArray();
70   }
71
72   public Flux<ChatRoom> getChatRooms(int shard)
73   {
74     return Flux.fromStream(chatrooms[shard].values().stream());
75   }
76 }