8c749d659c96d534d1ed0fd84854224e8d9f9937
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomData;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
6 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.common.utils.Utils;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.util.*;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class KafkaChatHome implements ChatHome
19 {
20   private final int numPartitions;
21   private final ChatRoomChannel chatRoomChannel;
22
23
24
25   @Override
26   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
27   {
28     log.info("Sending create-command for chat rooom: id={}, name={}");
29     return chatRoomChannel.sendCreateChatRoomRequest(id, name);
30   }
31
32   @Override
33   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
34   {
35     int shard = selectShard(id);
36     return chatRoomChannel
37         .getChatRoomInfo(shard, id)
38         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
39             id,
40             shard,
41             chatRoomChannel.getOwnedShards())));
42   }
43
44   @Override
45   public Flux<ChatRoomInfo> getChatRoomInfo()
46   {
47     return chatRoomChannel.getChatRoomInfo();
48   }
49
50   @Override
51   public Mono<ChatRoomData> getChatRoomData(UUID id)
52   {
53     int shard = selectShard(id);
54     return chatRoomChannel
55         .getChatRoomData(shard, id)
56         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
57             id,
58             shard,
59             chatRoomChannel.getOwnedShards())));
60   }
61
62   public Flux<ChatRoomData> getChatRoomData()
63   {
64       return chatRoomChannel.getChatRoomData();
65   }
66
67   int selectShard(UUID chatRoomId)
68   {
69     byte[] serializedKey = chatRoomId.toString().getBytes();
70     return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
71   }
72 }