WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / ShardedChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
6 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.net.URI;
12 import java.util.Arrays;
13 import java.util.HashSet;
14 import java.util.Set;
15 import java.util.UUID;
16 import java.util.stream.Collectors;
17
18
19 @Slf4j
20 public class ShardedChatHomeService implements ChatHomeService
21 {
22   private final SimpleChatHomeService[] chatHomes;
23   private final Set<Integer> ownedShards;
24   private final String[] shardOwners;
25   private final ShardingStrategy shardingStrategy;
26
27
28   public ShardedChatHomeService(
29       SimpleChatHomeService[] chatHomes,
30       URI[] shardOwners,
31       ShardingStrategy shardingStrategy)
32   {
33     this.chatHomes = chatHomes;
34     this.shardOwners = Arrays
35         .stream(shardOwners)
36         .map(uri -> uri.toASCIIString())
37         .toArray(size -> new String[size]);
38     this.shardingStrategy = shardingStrategy;
39     this.ownedShards = new HashSet<>();
40     for (int shard = 0; shard < chatHomes.length; shard++)
41       if(chatHomes[shard] != null)
42         this.ownedShards.add(shard);
43     log.info(
44         "Created ShardedChatHome for shards: {}",
45         ownedShards
46             .stream()
47             .map(String::valueOf)
48             .collect(Collectors.joining(", ")));
49   }
50
51
52   @Override
53   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
54   {
55     int shard = shardingStrategy.selectShard(id);
56     return chatHomes[shard] == null
57         ? Mono.error(new ShardNotOwnedException(shard))
58         : chatHomes[shard].createChatRoom(id, name);
59   }
60
61   @Override
62   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
63   {
64     int shard = selectShard(id);
65     return chatHomes[shard] == null
66         ? Mono.error(new ShardNotOwnedException(shard))
67         : chatHomes[shard]
68             .getChatRoomInfo(id)
69             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
70             ? new UnknownChatroomException(
71                 id,
72                 shard,
73                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
74             : throwable);
75   }
76
77   @Override
78   public Flux<ChatRoomInfo> getChatRoomInfo()
79   {
80     return Flux
81         .fromIterable(ownedShards)
82         .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
83   }
84
85   @Override
86   public Mono<ChatRoomData> getChatRoomData(UUID id)
87   {
88     int shard = selectShard(id);
89     return chatHomes[shard] == null
90         ? Mono.error(new ShardNotOwnedException(shard))
91         : chatHomes[shard]
92             .getChatRoomData(id)
93             .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
94                 ? new UnknownChatroomException(
95                 id,
96                 shard,
97                 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
98                 : throwable);
99   }
100
101   @Override
102   public Mono<String[]> getShardOwners()
103   {
104     return Mono.just(shardOwners);
105   }
106
107   private int selectShard(UUID chatroomId)
108   {
109     return shardingStrategy.selectShard(chatroomId);
110   }
111 }