refactor: Splitted `ChatRoomInfo` and `ChatRoomData` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
7
8 import java.util.HashSet;
9 import java.util.Set;
10 import java.util.UUID;
11 import java.util.stream.Collectors;
12
13
14 @Slf4j
15 public class ShardedChatHome implements ChatHome
16 {
17   private final SimpleChatHome[] chatHomes;
18   private final Set<Integer> ownedShards;
19   private final ShardingStrategy shardingStrategy;
20
21
22   public  ShardedChatHome(
23       SimpleChatHome[] chatHomes,
24       ShardingStrategy shardingStrategy)
25   {
26     this.chatHomes = chatHomes;
27     this.shardingStrategy = shardingStrategy;
28     this.ownedShards = new HashSet<>();
29     for (int shard = 0; shard < chatHomes.length; shard++)
30       if(chatHomes[shard] != null)
31         this.ownedShards.add(shard);
32     log.info(
33         "Created ShardedChatHome for shards: {}",
34         ownedShards
35             .stream()
36             .map(String::valueOf)
37             .collect(Collectors.joining(", ")));
38   }
39
40
41   @Override
42   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
43   {
44     int shard = shardingStrategy.selectShard(id);
45     return chatHomes[shard] == null
46         ? Mono.error(new ShardNotOwnedException(shard))
47         : chatHomes[shard].createChatRoom(id, name);
48   }
49
50   @Override
51   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
52   {
53     int shard = selectShard(id);
54     return chatHomes[shard] == null
55         ? Mono.error(new ShardNotOwnedException(shard))
56         : chatHomes[shard]
57             .getChatRoomInfo(id)
58             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
59             ? new UnknownChatroomException(
60                 id,
61                 shard,
62                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
63             : throwable);
64   }
65
66   @Override
67   public Flux<ChatRoomInfo> getChatRoomInfo()
68   {
69     return Flux
70         .fromIterable(ownedShards)
71         .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
72   }
73
74   @Override
75   public Mono<ChatRoomData> getChatRoomData(UUID id)
76   {
77     int shard = selectShard(id);
78     return chatHomes[shard] == null
79         ? Mono.error(new ShardNotOwnedException(shard))
80         : chatHomes[shard]
81             .getChatRoomData(id)
82             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
83                 ? new UnknownChatroomException(
84                 id,
85                 shard,
86                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
87                 : throwable);
88   }
89
90   public Flux<ChatRoomData> getChatRoomData()
91   {
92     return Flux
93         .fromIterable(ownedShards)
94         .flatMap(shard -> chatHomes[shard].getChatRoomData());
95   }
96
97
98
99   private int selectShard(UUID chatroomId)
100   {
101     return shardingStrategy.selectShard(chatroomId);
102   }
103 }