1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
6 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
11 import java.util.HashSet;
13 import java.util.UUID;
14 import java.util.stream.Collectors;
18 public class ShardedChatHomeService implements ChatHomeService
20 private final SimpleChatHomeService[] chatHomes;
21 private final Set<Integer> ownedShards;
22 private final ShardingStrategy shardingStrategy;
25 public ShardedChatHomeService(
26 SimpleChatHomeService[] chatHomes,
27 ShardingStrategy shardingStrategy)
29 this.chatHomes = chatHomes;
30 this.shardingStrategy = shardingStrategy;
31 this.ownedShards = new HashSet<>();
32 for (int shard = 0; shard < chatHomes.length; shard++)
33 if(chatHomes[shard] != null)
34 this.ownedShards.add(shard);
36 "Created ShardedChatHome for shards: {}",
40 .collect(Collectors.joining(", ")));
45 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
47 int shard = shardingStrategy.selectShard(id);
48 return chatHomes[shard] == null
49 ? Mono.error(new ShardNotOwnedException(shard))
50 : chatHomes[shard].createChatRoom(id, name);
54 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
56 int shard = selectShard(id);
57 return chatHomes[shard] == null
58 ? Mono.error(new ShardNotOwnedException(shard))
61 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
62 ? new UnknownChatroomException(
65 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
70 public Flux<ChatRoomInfo> getChatRoomInfo()
73 .fromIterable(ownedShards)
74 .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
78 public Mono<ChatRoomData> getChatRoomData(UUID id)
80 int shard = selectShard(id);
81 return chatHomes[shard] == null
82 ? Mono.error(new ShardNotOwnedException(shard))
85 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
86 ? new UnknownChatroomException(
89 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
93 public Flux<ChatRoomData> getChatRoomData()
96 .fromIterable(ownedShards)
97 .flatMap(shard -> chatHomes[shard].getChatRoomData());
102 private int selectShard(UUID chatroomId)
104 return shardingStrategy.selectShard(chatroomId);