refactor: `ShardNotOwnedException` should be send over `Mono.send()`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.util.HashSet;
11 import java.util.Set;
12 import java.util.UUID;
13 import java.util.stream.Collectors;
14
15
16 @Slf4j
17 public class ShardedChatHome implements ChatHome
18 {
19   private final ChatHome[] chatHomes;
20   private final Set<Integer> ownedShards;
21   private final ShardingStrategy shardingStrategy;
22
23
24   public  ShardedChatHome(
25       ChatHome[] chatHomes,
26       ShardingStrategy shardingStrategy)
27   {
28     this.chatHomes = chatHomes;
29     this.shardingStrategy = shardingStrategy;
30     this.ownedShards = new HashSet<>();
31     for (int shard = 0; shard < chatHomes.length; shard++)
32       if(chatHomes[shard] != null)
33         this.ownedShards.add(shard);
34     log.info(
35         "Created ShardedChatHome for shards: {}",
36         ownedShards
37             .stream()
38             .map(String::valueOf)
39             .collect(Collectors.joining(", ")));
40   }
41
42
43   @Override
44   public Mono<ChatRoom> getChatRoom(UUID id)
45   {
46     int shard = selectShard(id);
47     return chatHomes[shard] == null
48         ? Mono.error(new ShardNotOwnedException(shard))
49         : chatHomes[shard].getChatRoom(id);
50   }
51
52   @Override
53   public Flux<ChatRoom> getChatRooms()
54   {
55     return Flux
56         .fromIterable(ownedShards)
57         .flatMap(shard -> chatHomes[shard].getChatRooms());
58   }
59
60
61   private int selectShard(UUID chatroomId)
62   {
63     return shardingStrategy.selectShard(chatroomId);
64   }
65 }