1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
10 import java.util.HashSet;
12 import java.util.UUID;
13 import java.util.stream.Collectors;
17 public class ShardedChatHome implements ChatHome
19 private final ChatHome[] chatHomes;
20 private final Set<Integer> ownedShards;
21 private final ShardingStrategy shardingStrategy;
24 public ShardedChatHome(
26 ShardingStrategy shardingStrategy)
28 this.chatHomes = chatHomes;
29 this.shardingStrategy = shardingStrategy;
30 this.ownedShards = new HashSet<>();
31 for (int shard = 0; shard < chatHomes.length; shard++)
32 if(chatHomes[shard] != null)
33 this.ownedShards.add(shard);
35 "Created ShardedChatHome for shards: {}",
39 .collect(Collectors.joining(", ")));
44 public Mono<ChatRoom> getChatRoom(UUID id)
46 int shard = selectShard(id);
47 if (chatHomes[shard] == null)
48 throw new ShardNotOwnedException(shard);
49 return chatHomes[shard].getChatRoom(id);
53 public Flux<ChatRoom> getChatRooms()
56 .fromIterable(ownedShards)
57 .flatMap(shard -> chatHomes[shard].getChatRooms());
61 private int selectShard(UUID chatroomId)
63 return shardingStrategy.selectShard(chatroomId);