25a9bcf60ae07143cdd2b18e7485993463eb0f2a
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7
8 import java.util.*;
9
10
11 @Slf4j
12 public class InMemoryChatHomeService implements ChatHomeService
13 {
14   private final Map<UUID, ChatRoom>[] chatrooms;
15   private final Set<Integer> ownedShards;
16   private final ShardingStrategy shardingStrategy;
17
18
19   public InMemoryChatHomeService(
20       int numShards,
21       int[] ownedShards,
22       ShardingStrategy shardingStrategy,
23       Flux<ChatRoom> chatroomFlux)
24   {
25     log.debug("Creating InMemoryChatHomeService");
26
27     this.chatrooms = new Map[numShards];
28
29     this.ownedShards = Arrays
30         .stream(ownedShards)
31         .collect(
32             () -> new HashSet<>(),
33             (set, i) -> set.add(i),
34             (a, b) -> a.addAll(b));
35
36     this.shardingStrategy = shardingStrategy;
37
38     for (int shard = 0; shard < numShards; shard++)
39     {
40       chatrooms[shard] = this.ownedShards.contains(shard)
41           ? new HashMap<>()
42           : null;
43     }
44     chatroomFlux
45         .filter(chatRoom ->
46         {
47           if (this.ownedShards.contains(chatRoom.getShard()))
48           {
49             return true;
50           }
51           else
52           {
53             log.info("Ignoring not owned chat-room {}", chatRoom);
54             return false;
55           }
56         })
57         .toStream()
58         .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
59   }
60
61   void putChatRoom(ChatRoom chatRoom)
62   {
63     UUID id = chatRoom.getId();
64     int shard = shardingStrategy.selectShard(id);
65     if (!ownedShards.contains(shard))
66       throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards);
67     chatrooms[shard].put(id, chatRoom);
68   }
69
70   @Override
71   public Mono<ChatRoom> getChatRoom(UUID id)
72   {
73     int shard = shardingStrategy.selectShard(id);
74     if (ownedShards.contains(shard))
75     {
76       return Mono.justOrEmpty(chatrooms[shard].get(id));
77     }
78     else
79     {
80       int[] ownedShards = new int[this.ownedShards.size()];
81       Iterator<Integer> iterator = this.ownedShards.iterator();
82       for (int i = 0; iterator.hasNext(); i++)
83       {
84         ownedShards[i] = iterator.next();
85       }
86       return Mono.error(new UnknownChatroomException(
87           id,
88           shard,
89           ownedShards));
90     }
91   }
92
93   @Override
94   public Flux<ChatRoom> getChatRooms()
95   {
96     return Flux
97         .fromIterable(ownedShards)
98         .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
99   }
100 }