1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.common.utils.Utils;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
15 @RequiredArgsConstructor
17 public class KafkaChatHome implements ChatHome
19 private final int numPartitions;
20 private final ChatRoomChannel chatRoomChannel;
24 public Mono<ChatRoom> getChatRoom(UUID id)
26 int shard = selectShard(id);
27 return chatRoomChannel
28 .getChatRoom(shard, id)
29 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
32 chatRoomChannel.getOwnedShards())));
35 int selectShard(UUID chatRoomId)
37 byte[] serializedKey = chatRoomId.toString().getBytes();
38 return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
42 public Flux<ChatRoom> getChatRooms()
44 return chatRoomChannel.getChatRooms();