1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
6 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
12 import java.util.Arrays;
13 import java.util.HashSet;
15 import java.util.UUID;
16 import java.util.stream.Collectors;
20 public class ShardedChatHomeService implements ChatHomeService
22 private final SimpleChatHomeService[] chatHomes;
23 private final Set<Integer> ownedShards;
24 private final String[] shardOwners;
25 private final ShardingStrategy shardingStrategy;
28 public ShardedChatHomeService(
29 SimpleChatHomeService[] chatHomes,
31 ShardingStrategy shardingStrategy)
33 this.chatHomes = chatHomes;
34 this.shardOwners = Arrays
36 .map(uri -> uri.toASCIIString())
37 .toArray(size -> new String[size]);
38 this.shardingStrategy = shardingStrategy;
39 this.ownedShards = new HashSet<>();
40 for (int shard = 0; shard < chatHomes.length; shard++)
41 if(chatHomes[shard] != null)
42 this.ownedShards.add(shard);
44 "Created ShardedChatHome for shards: {}",
48 .collect(Collectors.joining(", ")));
53 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
55 int shard = shardingStrategy.selectShard(id);
56 return chatHomes[shard] == null
57 ? Mono.error(new ShardNotOwnedException(shard))
58 : chatHomes[shard].createChatRoom(id, name);
62 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
64 int shard = selectShard(id);
65 return chatHomes[shard] == null
66 ? Mono.error(new ShardNotOwnedException(shard))
69 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
70 ? new UnknownChatroomException(
73 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
78 public Flux<ChatRoomInfo> getChatRoomInfo()
81 .fromIterable(ownedShards)
82 .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
86 public Mono<ChatRoomData> getChatRoomData(UUID id)
88 int shard = selectShard(id);
89 return chatHomes[shard] == null
90 ? Mono.error(new ShardNotOwnedException(shard))
93 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
94 ? new UnknownChatroomException(
97 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
102 public Mono<String[]> getShardOwners()
104 return Mono.just(shardOwners);
107 private int selectShard(UUID chatroomId)
109 return shardingStrategy.selectShard(chatroomId);