1 package de.juplo.kafka.chat.backend.domain;
3 import lombok.RequiredArgsConstructor;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
10 @RequiredArgsConstructor
11 public class ShardedChatHome implements ChatHome
13 private final ChatHome[] chatHomes;
14 private final ShardingStrategy selectionStrategy;
18 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
20 return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom);
24 public Mono<ChatRoom> getChatRoom(UUID id)
26 return chatHomes[selectShard(id)].getChatRoom(id);
30 public Flux<ChatRoom> getChatRooms()
34 .flatMap(chatHome -> chatHome.getChatRooms());
38 private int selectShard(UUID chatroomId)
40 return selectionStrategy.selectShard(chatroomId);