95a30dbd124045b217e304c735bbbd8a88938182
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.util.*;
11
12
13 @Slf4j
14 public class InMemoryChatHomeService implements ChatHomeService
15 {
16   private final ShardingStrategy shardingStrategy;
17   private final Map<UUID, ChatRoom>[] chatrooms;
18
19
20   public InMemoryChatHomeService(
21       ShardingStrategy shardingStrategy,
22       int numShards,
23       int[] ownedShards,
24       Flux<ChatRoom> chatroomFlux)
25   {
26     log.debug("Creating InMemoryChatHomeService");
27     this.shardingStrategy = shardingStrategy;
28     this.chatrooms = new Map[numShards];
29     Set<Integer> owned = Arrays
30         .stream(ownedShards)
31         .collect(
32             () -> new HashSet<>(),
33             (set, i) -> set.add(i),
34             (a, b) -> a.addAll(b));
35     for (int shard = 0; shard < numShards; shard++)
36     {
37       chatrooms[shard] = owned.contains(shard)
38           ? new HashMap<>()
39           : null;
40     }
41     chatroomFlux
42         .filter(chatRoom ->
43         {
44           int shard = shardingStrategy.selectShard(chatRoom.getId());
45           if (owned.contains(shard))
46           {
47             return true;
48           }
49           else
50           {
51             log.info("Ignoring not owned chat-room {}", chatRoom);
52             return false;
53           }
54         })
55         .toStream()
56         .forEach(chatRoom ->
57         {
58           getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
59         });
60   }
61
62   @Override
63   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
64   {
65     getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
66     return Mono.just(chatRoom);
67   }
68
69   @Override
70   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
71   {
72     return Mono.justOrEmpty(chatrooms[shard].get(id));
73   }
74
75   @Override
76   public Flux<ChatRoom> getChatRooms(int shard)
77   {
78     return Flux.fromStream(chatrooms[shard].values().stream());
79   }
80
81
82   private Map<UUID, ChatRoom> getChatRoomMapFor(ChatRoom chatRoom)
83   {
84     int shard = shardingStrategy.selectShard(chatRoom.getId());
85     return chatrooms[shard];
86   }
87 }