3023f782556ec3ea68b6dd101d6cc4d0b5eeef7a
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.extern.slf4j.Slf4j;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6
7 import java.util.HashSet;
8 import java.util.Set;
9 import java.util.UUID;
10 import java.util.stream.Collectors;
11
12
13 @Slf4j
14 public class ShardedChatHome implements ChatHome
15 {
16   private final ChatHome[] chatHomes;
17   private final Set<Integer> ownedShards;
18   private final ShardingStrategy shardingStrategy;
19
20
21   public  ShardedChatHome(
22       ChatHome[] chatHomes,
23       ShardingStrategy shardingStrategy)
24   {
25     this.chatHomes = chatHomes;
26     this.shardingStrategy = shardingStrategy;
27     this.ownedShards = new HashSet<>();
28     for (int shard = 0; shard < chatHomes.length; shard++)
29       if(chatHomes[shard] != null)
30         this.ownedShards.add(shard);
31     log.info(
32         "Created ShardedChatHome for shards: {}",
33         ownedShards
34             .stream()
35             .map(String::valueOf)
36             .collect(Collectors.joining(", ")));
37   }
38
39
40   @Override
41   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
42   {
43     return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom);
44   }
45
46   @Override
47   public Mono<ChatRoom> getChatRoom(UUID id)
48   {
49     return chatHomes[selectShard(id)].getChatRoom(id);
50   }
51
52   @Override
53   public Flux<ChatRoom> getChatRooms()
54   {
55     return Flux
56         .fromIterable(ownedShards)
57         .flatMap(shard -> chatHomes[shard].getChatRooms());
58   }
59
60
61   private int selectShard(UUID chatroomId)
62   {
63     return shardingStrategy.selectShard(chatroomId);
64   }
65 }