1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
6 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.common.utils.Utils;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
16 @RequiredArgsConstructor
18 public class KafkaChatHome implements ChatHome
20 private final int numPartitions;
21 private final ChatRoomChannel chatRoomChannel;
26 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
28 log.info("Sending create-command for chat rooom: id={}, name={}");
29 return chatRoomChannel.sendCreateChatRoomRequest(id, name);
33 public Mono<ChatRoom> getChatRoom(UUID id)
35 int shard = selectShard(id);
36 return chatRoomChannel
37 .getChatRoom(shard, id)
38 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
41 chatRoomChannel.getOwnedShards())));
44 int selectShard(UUID chatRoomId)
46 byte[] serializedKey = chatRoomId.toString().getBytes();
47 return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
51 public Flux<ChatRoom> getChatRooms()
53 return chatRoomChannel.getChatRooms();