1 package de.juplo.kafka.chat.backend.implementation.inmemory;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
6 import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
12 import java.util.Arrays;
13 import java.util.HashSet;
15 import java.util.UUID;
16 import java.util.stream.Collectors;
20 public class ShardedChatHomeService implements ChatHomeService
22 private final String instanceId;
23 private final SimpleChatHomeService[] chatHomes;
24 private final Set<Integer> ownedShards;
25 private final String[] shardOwners;
26 private final ShardingStrategy shardingStrategy;
29 public ShardedChatHomeService(
31 SimpleChatHomeService[] chatHomes,
33 ShardingStrategy shardingStrategy)
35 this.instanceId = instanceId;
36 this.chatHomes = chatHomes;
37 this.shardOwners = Arrays
39 .map(uri -> uri.toASCIIString())
40 .toArray(size -> new String[size]);
41 this.shardingStrategy = shardingStrategy;
42 this.ownedShards = new HashSet<>();
43 for (int shard = 0; shard < chatHomes.length; shard++)
44 if(chatHomes[shard] != null)
45 this.ownedShards.add(shard);
46 log.info("Created {}", this);
51 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
53 int shard = shardingStrategy.selectShard(id);
54 return chatHomes[shard] == null
55 ? Mono.error(new ShardNotOwnedException(instanceId, shard))
56 : chatHomes[shard].createChatRoom(id, name);
60 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
62 int shard = selectShard(id);
63 return chatHomes[shard] == null
64 ? Mono.error(new ShardNotOwnedException(instanceId, shard))
67 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
68 ? new UnknownChatroomException(
71 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
76 public Flux<ChatRoomInfo> getChatRoomInfo()
79 .fromIterable(ownedShards)
80 .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
84 public Mono<ChatRoomData> getChatRoomData(UUID id)
86 int shard = selectShard(id);
87 return chatHomes[shard] == null
88 ? Mono.error(new ShardNotOwnedException(instanceId, shard))
91 .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
92 ? new UnknownChatroomException(
95 ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
100 public Mono<String[]> getShardOwners()
102 return Mono.just(shardOwners);
105 private int selectShard(UUID chatroomId)
107 return shardingStrategy.selectShard(chatroomId);
111 public String toString()
113 StringBuffer stringBuffer = new StringBuffer(ShardedChatHomeService.class.getSimpleName());
114 stringBuffer.append(", shards=[");
115 stringBuffer.append(ownedShards
118 .map(String::valueOf)
119 .collect(Collectors.joining(",")));
120 stringBuffer.append("]");
121 return stringBuffer.toString();