refactor: Simplified implementation - Removed interface `ChatRoomFactory`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
6 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.common.utils.Utils;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.util.*;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class KafkaChatHome implements ChatHome
19 {
20   private final int numPartitions;
21   private final ChatRoomChannel chatRoomChannel;
22
23
24
25   @Override
26   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
27   {
28     log.info("Sending create-command for chat rooom: id={}, name={}");
29     return chatRoomChannel.sendCreateChatRoomRequest(id, name);
30   }
31
32   @Override
33   public Mono<ChatRoom> getChatRoom(UUID id)
34   {
35     int shard = selectShard(id);
36     return chatRoomChannel
37         .getChatRoom(shard, id)
38         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
39             id,
40             shard,
41             chatRoomChannel.getOwnedShards())));
42   }
43
44   int selectShard(UUID chatRoomId)
45   {
46     byte[] serializedKey = chatRoomId.toString().getBytes();
47     return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
48   }
49
50   @Override
51   public Flux<ChatRoom> getChatRooms()
52   {
53       return chatRoomChannel.getChatRooms();
54   }
55 }