2041f530f152cbc3dea4c0e2075628cc41939ecd
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8
9 import java.util.HashSet;
10 import java.util.Set;
11 import java.util.UUID;
12 import java.util.stream.Collectors;
13
14
15 @Slf4j
16 public class ShardedChatHome implements ChatHome
17 {
18   private final SimpleChatHome[] chatHomes;
19   private final Set<Integer> ownedShards;
20   private final ShardingStrategy shardingStrategy;
21
22
23   public  ShardedChatHome(
24       SimpleChatHome[] chatHomes,
25       ShardingStrategy shardingStrategy)
26   {
27     this.chatHomes = chatHomes;
28     this.shardingStrategy = shardingStrategy;
29     this.ownedShards = new HashSet<>();
30     for (int shard = 0; shard < chatHomes.length; shard++)
31       if(chatHomes[shard] != null)
32         this.ownedShards.add(shard);
33     log.info(
34         "Created ShardedChatHome for shards: {}",
35         ownedShards
36             .stream()
37             .map(String::valueOf)
38             .collect(Collectors.joining(", ")));
39   }
40
41
42   @Override
43   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
44   {
45     int shard = shardingStrategy.selectShard(id);
46     return chatHomes[shard] == null
47         ? Mono.error(new ShardNotOwnedException(shard))
48         : chatHomes[shard].createChatRoom(id, name);
49   }
50
51   @Override
52   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
53   {
54     int shard = selectShard(id);
55     return chatHomes[shard] == null
56         ? Mono.error(new ShardNotOwnedException(shard))
57         : chatHomes[shard]
58             .getChatRoomInfo(id)
59             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
60             ? new UnknownChatroomException(
61                 id,
62                 shard,
63                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
64             : throwable);
65   }
66
67   @Override
68   public Flux<ChatRoomInfo> getChatRoomInfo()
69   {
70     return Flux
71         .fromIterable(ownedShards)
72         .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
73   }
74
75   @Override
76   public Mono<ChatRoomData> getChatRoomData(UUID id)
77   {
78     int shard = selectShard(id);
79     return chatHomes[shard] == null
80         ? Mono.error(new ShardNotOwnedException(shard))
81         : chatHomes[shard]
82             .getChatRoomData(id)
83             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
84                 ? new UnknownChatroomException(
85                 id,
86                 shard,
87                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
88                 : throwable);
89   }
90
91   public Flux<ChatRoomData> getChatRoomData()
92   {
93     return Flux
94         .fromIterable(ownedShards)
95         .flatMap(shard -> chatHomes[shard].getChatRoomData());
96   }
97
98
99
100   private int selectShard(UUID chatroomId)
101   {
102     return shardingStrategy.selectShard(chatroomId);
103   }
104 }