refactor: Moved implementation details out of `domain` -- Moved classes
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.extern.slf4j.Slf4j;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6
7 import java.util.HashSet;
8 import java.util.Set;
9 import java.util.UUID;
10 import java.util.stream.Collectors;
11
12
13 @Slf4j
14 public class ShardedChatHome implements ChatHome
15 {
16   private final ChatHome[] chatHomes;
17   private final Set<Integer> ownedShards;
18   private final ShardingStrategy shardingStrategy;
19
20
21   public  ShardedChatHome(
22       ChatHome[] chatHomes,
23       ShardingStrategy shardingStrategy)
24   {
25     this.chatHomes = chatHomes;
26     this.shardingStrategy = shardingStrategy;
27     this.ownedShards = new HashSet<>();
28     for (int shard = 0; shard < chatHomes.length; shard++)
29       if(chatHomes[shard] != null)
30         this.ownedShards.add(shard);
31     log.info(
32         "Created ShardedChatHome for shards: {}",
33         ownedShards
34             .stream()
35             .map(String::valueOf)
36             .collect(Collectors.joining(", ")));
37   }
38
39
40   @Override
41   public Mono<ChatRoom> getChatRoom(UUID id)
42   {
43     int shard = selectShard(id);
44     if (chatHomes[shard] == null)
45       throw new ShardNotOwnedException(shard);
46     return chatHomes[shard].getChatRoom(id);
47   }
48
49   @Override
50   public Flux<ChatRoom> getChatRooms()
51   {
52     return Flux
53         .fromIterable(ownedShards)
54         .flatMap(shard -> chatHomes[shard].getChatRooms());
55   }
56
57
58   private int selectShard(UUID chatroomId)
59   {
60     return shardingStrategy.selectShard(chatroomId);
61   }
62 }