1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomData;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
6 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.common.utils.Utils;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
16 @RequiredArgsConstructor
18 public class KafkaChatHome implements ChatHome
20 private final int numPartitions;
21 private final ChatRoomChannel chatRoomChannel;
26 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
28 log.info("Sending create-command for chat rooom: id={}, name={}");
29 return chatRoomChannel.sendCreateChatRoomRequest(id, name);
33 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
35 int shard = selectShard(id);
36 return chatRoomChannel
37 .getChatRoomInfo(shard, id)
38 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
41 chatRoomChannel.getOwnedShards())));
45 public Flux<ChatRoomInfo> getChatRoomInfo()
47 return chatRoomChannel.getChatRoomInfo();
51 public Mono<ChatRoomData> getChatRoomData(UUID id)
53 int shard = selectShard(id);
54 return chatRoomChannel
55 .getChatRoomData(shard, id)
56 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
59 chatRoomChannel.getOwnedShards())));
62 public Flux<ChatRoomData> getChatRoomData()
64 return chatRoomChannel.getChatRoomData();
67 int selectShard(UUID chatRoomId)
69 byte[] serializedKey = chatRoomId.toString().getBytes();
70 return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;