NG
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.util.*;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class KafkaChatHome implements ChatHome
17 {
18   private final ShardingStrategy shardingStrategy;
19   private final ChatRoomChannel chatRoomChannel;
20   private final ChatMessageChannel chatMessageChanel;
21
22
23   @Override
24   public Mono<ChatRoom> getChatRoom(UUID id)
25   {
26     int shard = shardingStrategy.selectShard(id);
27     if (chatMessageChanel.isLoadInProgress())
28     {
29       throw new LoadInProgressException(shard);
30     }
31     else
32     {
33       return chatMessageChanel.getChatRoom(shard, id);
34     }
35   }
36
37   @Override
38   public Flux<ChatRoomInfo> getChatRooms()
39   {
40       return chatRoomChannel.getChatRooms();
41   }
42 }