1 package de.juplo.kafka.chat.backend.domain;
3 import lombok.extern.slf4j.Slf4j;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
7 import java.util.HashSet;
10 import java.util.stream.Collectors;
14 public class ShardedChatHome implements ChatHome
16 private final ChatHome[] chatHomes;
17 private final Set<Integer> ownedShards;
18 private final ShardingStrategy shardingStrategy;
21 public ShardedChatHome(
23 ShardingStrategy shardingStrategy)
25 this.chatHomes = chatHomes;
26 this.shardingStrategy = shardingStrategy;
27 this.ownedShards = new HashSet<>();
28 for (int shard = 0; shard < chatHomes.length; shard++)
29 if(chatHomes[shard] != null)
30 this.ownedShards.add(shard);
32 "Created ShardedChatHome for shards: {}",
36 .collect(Collectors.joining(", ")));
41 public Mono<ChatRoom> getChatRoom(UUID id)
43 return chatHomes[selectShard(id)].getChatRoom(id);
47 public Flux<ChatRoom> getChatRooms()
50 .fromIterable(ownedShards)
51 .flatMap(shard -> chatHomes[shard].getChatRooms());
55 private int selectShard(UUID chatroomId)
57 return shardingStrategy.selectShard(chatroomId);