9832519dd65bd7d049da91da62526da8b60074da
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomData;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
6 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.common.utils.Utils;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.util.*;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class KafkaChatHomeService implements ChatHomeService
19 {
20   private final int numPartitions;
21   private final InfoChannel infoChannel;
22   private final DataChannel dataChannel;
23
24
25
26   @Override
27   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
28   {
29     int shard = selectShard(id);
30     log.info(
31         "Sending create-command for chat rooom: id={}, name={}, shard={}",
32         id,
33         name,
34         shard);
35     return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
36   }
37
38   @Override
39   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
40   {
41     return infoChannel
42         .getChatRoomInfo(id)
43         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
44   }
45
46   @Override
47   public Flux<ChatRoomInfo> getChatRoomInfo()
48   {
49     return infoChannel.getChatRoomInfo();
50   }
51
52   @Override
53   public Mono<ChatRoomData> getChatRoomData(UUID id)
54   {
55     int shard = selectShard(id);
56     return dataChannel
57         .getChatRoomData(shard, id)
58         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
59             id,
60             shard,
61             dataChannel.getOwnedShards())));
62   }
63
64   int selectShard(UUID chatRoomId)
65   {
66     byte[] serializedKey = chatRoomId.toString().getBytes();
67     return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
68   }
69 }