ffa7860a99c327b5538b7de54a7e6d0a7abab708
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / domain / ShardedChatHome.java
1 package de.juplo.kafka.chat.backend.domain;
2
3 import lombok.RequiredArgsConstructor;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6
7 import java.util.UUID;
8
9
10 @RequiredArgsConstructor
11 public class ShardedChatHome implements ChatHome
12 {
13   private final ChatHome[] chatHomes;
14   private final ShardingStrategy selectionStrategy;
15
16
17   @Override
18   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
19   {
20     return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom);
21   }
22
23   @Override
24   public Mono<ChatRoom> getChatRoom(UUID id)
25   {
26     return chatHomes[selectShard(id)].getChatRoom(id);
27   }
28
29   @Override
30   public Flux<ChatRoom> getChatRooms()
31   {
32     return Flux
33         .fromArray(chatHomes)
34         .flatMap(chatHome -> chatHome.getChatRooms());
35   }
36
37
38   private int selectShard(UUID chatroomId)
39   {
40     return selectionStrategy.selectShard(chatroomId);
41   }
42 }