1 package de.juplo.kafka.chat.backend.persistence.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
9 import java.util.HashSet;
11 import java.util.UUID;
12 import java.util.stream.Collectors;
16 public class ShardedChatHome implements ChatHome
18 private final SimpleChatHome[] chatHomes;
19 private final Set<Integer> ownedShards;
20 private final ShardingStrategy shardingStrategy;
23 public ShardedChatHome(
24 SimpleChatHome[] chatHomes,
25 ShardingStrategy shardingStrategy)
27 this.chatHomes = chatHomes;
28 this.shardingStrategy = shardingStrategy;
29 this.ownedShards = new HashSet<>();
30 for (int shard = 0; shard < chatHomes.length; shard++)
31 if(chatHomes[shard] != null)
32 this.ownedShards.add(shard);
34 "Created ShardedChatHome for shards: {}",
38 .collect(Collectors.joining(", ")));
43 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
45 int shard = shardingStrategy.selectShard(id);
46 return chatHomes[shard] == null
47 ? Mono.error(new ShardNotOwnedException(shard))
48 : chatHomes[shard].createChatRoom(id, name);
52 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
54 int shard = selectShard(id);
55 return chatHomes[shard] == null
56 ? Mono.error(new ShardNotOwnedException(shard))
59 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
60 ? new UnknownChatroomException(
63 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
68 public Flux<ChatRoomInfo> getChatRoomInfo()
71 .fromIterable(ownedShards)
72 .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
76 public Mono<ChatRoomData> getChatRoomData(UUID id)
78 int shard = selectShard(id);
79 return chatHomes[shard] == null
80 ? Mono.error(new ShardNotOwnedException(shard))
83 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
84 ? new UnknownChatroomException(
87 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
91 public Flux<ChatRoomData> getChatRoomData()
94 .fromIterable(ownedShards)
95 .flatMap(shard -> chatHomes[shard].getChatRoomData());
100 private int selectShard(UUID chatroomId)
102 return shardingStrategy.selectShard(chatroomId);