8f262a0b7def4abfb9716c1cfaca9300545349b2
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8
9 import java.util.*;
10
11
12 @Slf4j
13 public class InMemoryChatHomeService implements ChatHomeService
14 {
15   private final Map<UUID, ChatRoom>[] chatrooms;
16
17
18   public InMemoryChatHomeService(
19       int numShards,
20       int[] ownedShards,
21       Flux<ChatRoom> chatroomFlux)
22   {
23     log.debug("Creating InMemoryChatHomeService");
24     this.chatrooms = new Map[numShards];
25     Set<Integer> owned = Arrays
26         .stream(ownedShards)
27         .collect(
28             () -> new HashSet<>(),
29             (set, i) -> set.add(i),
30             (a, b) -> a.addAll(b));
31     for (int shard = 0; shard < numShards; shard++)
32     {
33       chatrooms[shard] = owned.contains(shard)
34           ? new HashMap<>()
35           : null;
36     }
37     chatroomFlux
38         .filter(chatRoom ->
39         {
40           if (owned.contains(chatRoom.getShard()))
41           {
42             return true;
43           }
44           else
45           {
46             log.info("Ignoring not owned chat-room {}", chatRoom);
47             return false;
48           }
49         })
50         .toStream()
51         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
52   }
53
54   public void putChatRoom(ChatRoom chatRoom)
55   {
56     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
57   }
58
59   @Override
60   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
61   {
62     return Mono.justOrEmpty(chatrooms[shard].get(id));
63   }
64
65   @Override
66   public Flux<ChatRoom> getChatRooms(int shard)
67   {
68     return Flux.fromStream(chatrooms[shard].values().stream());
69   }
70 }