WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
6 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.util.HashSet;
12 import java.util.Set;
13 import java.util.UUID;
14 import java.util.stream.Collectors;
15
16
17 @Slf4j
18 public class ShardedChatHome implements ChatHome
19 {
20   private final ChatHome[] chatHomes;
21   private final Set<Integer> ownedShards;
22   private final ShardingStrategy shardingStrategy;
23
24
25   public  ShardedChatHome(
26       ChatHome[] chatHomes,
27       ShardingStrategy shardingStrategy)
28   {
29     this.chatHomes = chatHomes;
30     this.shardingStrategy = shardingStrategy;
31     this.ownedShards = new HashSet<>();
32     for (int shard = 0; shard < chatHomes.length; shard++)
33       if(chatHomes[shard] != null)
34         this.ownedShards.add(shard);
35     log.info(
36         "Created ShardedChatHome for shards: {}",
37         ownedShards
38             .stream()
39             .map(String::valueOf)
40             .collect(Collectors.joining(", ")));
41   }
42
43
44   @Override
45   public Mono<ChatRoom> getChatRoom(UUID id)
46   {
47     int shard = selectShard(id);
48     return chatHomes[shard] == null
49         ? Mono.error(new ShardNotOwnedException(shard))
50         : chatHomes[shard]
51             .getChatRoom(id)
52             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
53             ? Mono.error(new UnknownChatroomException(id, shard, ownedShards))
54             : Mono.error(throwable));
55   }
56
57   @Override
58   public Flux<ChatRoom> getChatRooms()
59   {
60     return Flux
61         .fromIterable(ownedShards)
62         .flatMap(shard -> chatHomes[shard].getChatRooms());
63   }
64
65
66   private int selectShard(UUID chatroomId)
67   {
68     return shardingStrategy.selectShard(chatroomId);
69   }
70 }