1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.extern.slf4j.Slf4j;
5 import reactor.core.publisher.Flux;
6 import reactor.core.publisher.Mono;
8 import java.util.HashSet;
10 import java.util.UUID;
11 import java.util.stream.Collectors;
15 public class ShardedChatHome implements ChatHome
17 private final SimpleChatHome[] chatHomes;
18 private final Set<Integer> ownedShards;
19 private final ShardingStrategy shardingStrategy;
22 public ShardedChatHome(
23 SimpleChatHome[] chatHomes,
24 ShardingStrategy shardingStrategy)
26 this.chatHomes = chatHomes;
27 this.shardingStrategy = shardingStrategy;
28 this.ownedShards = new HashSet<>();
29 for (int shard = 0; shard < chatHomes.length; shard++)
30 if(chatHomes[shard] != null)
31 this.ownedShards.add(shard);
33 "Created ShardedChatHome for shards: {}",
37 .collect(Collectors.joining(", ")));
42 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
44 int shard = shardingStrategy.selectShard(id);
45 return chatHomes[shard] == null
46 ? Mono.error(new ShardNotOwnedException(shard))
47 : chatHomes[shard].createChatRoom(id, name);
51 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
53 int shard = selectShard(id);
54 return chatHomes[shard] == null
55 ? Mono.error(new ShardNotOwnedException(shard))
58 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
59 ? new UnknownChatroomException(
62 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
67 public Flux<ChatRoomInfo> getChatRoomInfo()
70 .fromIterable(ownedShards)
71 .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
75 public Mono<ChatRoomData> getChatRoomData(UUID id)
77 int shard = selectShard(id);
78 return chatHomes[shard] == null
79 ? Mono.error(new ShardNotOwnedException(shard))
82 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
83 ? new UnknownChatroomException(
86 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
90 public Flux<ChatRoomData> getChatRoomData()
93 .fromIterable(ownedShards)
94 .flatMap(shard -> chatHomes[shard].getChatRoomData());
99 private int selectShard(UUID chatroomId)
101 return shardingStrategy.selectShard(chatroomId);