1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
6 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
11 import java.util.HashSet;
13 import java.util.UUID;
14 import java.util.stream.Collectors;
18 public class ShardedChatHome implements ChatHome
20 private final ChatHome[] chatHomes;
21 private final Set<Integer> ownedShards;
22 private final ShardingStrategy shardingStrategy;
25 public ShardedChatHome(
27 ShardingStrategy shardingStrategy)
29 this.chatHomes = chatHomes;
30 this.shardingStrategy = shardingStrategy;
31 this.ownedShards = new HashSet<>();
32 for (int shard = 0; shard < chatHomes.length; shard++)
33 if(chatHomes[shard] != null)
34 this.ownedShards.add(shard);
36 "Created ShardedChatHome for shards: {}",
40 .collect(Collectors.joining(", ")));
45 public Mono<ChatRoom> getChatRoom(UUID id)
47 int shard = selectShard(id);
48 return chatHomes[shard] == null
49 ? Mono.error(new ShardNotOwnedException(shard))
52 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
53 ? new UnknownChatroomException(
56 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
61 public Flux<ChatRoom> getChatRooms()
64 .fromIterable(ownedShards)
65 .flatMap(shard -> chatHomes[shard].getChatRooms());
69 private int selectShard(UUID chatroomId)
71 return shardingStrategy.selectShard(chatroomId);