refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
6 import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.util.HashSet;
12 import java.util.Set;
13 import java.util.UUID;
14 import java.util.stream.Collectors;
15
16
17 @Slf4j
18 public class ShardedChatHome implements ChatHome
19 {
20   private final SimpleChatHome[] chatHomes;
21   private final Set<Integer> ownedShards;
22   private final ShardingStrategy shardingStrategy;
23
24
25   public  ShardedChatHome(
26       SimpleChatHome[] chatHomes,
27       ShardingStrategy shardingStrategy)
28   {
29     this.chatHomes = chatHomes;
30     this.shardingStrategy = shardingStrategy;
31     this.ownedShards = new HashSet<>();
32     for (int shard = 0; shard < chatHomes.length; shard++)
33       if(chatHomes[shard] != null)
34         this.ownedShards.add(shard);
35     log.info(
36         "Created ShardedChatHome for shards: {}",
37         ownedShards
38             .stream()
39             .map(String::valueOf)
40             .collect(Collectors.joining(", ")));
41   }
42
43
44   @Override
45   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
46   {
47     int shard = shardingStrategy.selectShard(id);
48     return chatHomes[shard] == null
49         ? Mono.error(new ShardNotOwnedException(shard))
50         : chatHomes[shard].createChatRoom(id, name);
51   }
52
53   @Override
54   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
55   {
56     int shard = selectShard(id);
57     return chatHomes[shard] == null
58         ? Mono.error(new ShardNotOwnedException(shard))
59         : chatHomes[shard]
60             .getChatRoomInfo(id)
61             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
62             ? new UnknownChatroomException(
63                 id,
64                 shard,
65                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
66             : throwable);
67   }
68
69   @Override
70   public Flux<ChatRoomInfo> getChatRoomInfo()
71   {
72     return Flux
73         .fromIterable(ownedShards)
74         .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
75   }
76
77   @Override
78   public Mono<ChatRoomData> getChatRoomData(UUID id)
79   {
80     int shard = selectShard(id);
81     return chatHomes[shard] == null
82         ? Mono.error(new ShardNotOwnedException(shard))
83         : chatHomes[shard]
84             .getChatRoomData(id)
85             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
86                 ? new UnknownChatroomException(
87                 id,
88                 shard,
89                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
90                 : throwable);
91   }
92
93   public Flux<ChatRoomData> getChatRoomData()
94   {
95     return Flux
96         .fromIterable(ownedShards)
97         .flatMap(shard -> chatHomes[shard].getChatRoomData());
98   }
99
100
101
102   private int selectShard(UUID chatroomId)
103   {
104     return shardingStrategy.selectShard(chatroomId);
105   }
106 }