1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.common.utils.Utils;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
14 @RequiredArgsConstructor
16 public class KafkaChatHome implements ChatHome
18 private final int numPartitions;
19 private final ChatRoomChannel chatRoomChannel;
23 public Mono<ChatRoom> getChatRoom(UUID id)
25 int shard = selectShard(id);
26 return chatRoomChannel.getChatRoom(shard, id);
29 int selectShard(UUID chatRoomId)
31 byte[] serializedKey = chatRoomId.toString().getBytes();
32 return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
36 public Flux<ChatRoom> getChatRooms()
38 return chatRoomChannel.getChatRooms();