38aecd187e246ea906e9bb228b8e69cd00a1214c
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.util.*;
11
12
13 @Slf4j
14 public class KafkaChatHomeService implements ChatHomeService
15 {
16   private final ChatMessageChannel chatMessageChanel;
17
18
19   public KafkaChatHomeService(ChatMessageChannel chatMessageChannel)
20   {
21     log.debug("Creating KafkaChatHomeService");
22     this.chatMessageChanel = chatMessageChannel;
23   }
24
25
26   @Override
27   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
28   {
29     if (chatMessageChanel.isLoadInProgress())
30     {
31       throw new ShardNotOwnedException(shard);
32     }
33     else
34     {
35       return chatMessageChanel.getChatRoom(shard, id);
36     }
37   }
38
39   @Override
40   public Flux<ChatRoom> getChatRooms(int shard)
41   {
42     if (chatMessageChanel.isLoadInProgress())
43     {
44       throw new ShardNotOwnedException(shard);
45     }
46     else
47     {
48       return chatMessageChanel.getChatRooms(shard);
49     }
50   }
51 }