1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomData;
5 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
6 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.common.utils.Utils;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
16 @RequiredArgsConstructor
18 public class KafkaChatHomeService implements ChatHomeService
20 private final int numPartitions;
21 private final InfoChannel infoChannel;
22 private final DataChannel dataChannel;
27 public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
29 int shard = selectShard(id);
31 "Sending create-command for chat rooom: id={}, name={}, shard={}",
35 return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
39 public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
43 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
47 public Flux<ChatRoomInfo> getChatRoomInfo()
49 return infoChannel.getChatRoomInfo();
53 public Mono<ChatRoomData> getChatRoomData(UUID id)
55 int shard = selectShard(id);
57 .getChatRoomData(shard, id)
58 .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
61 dataChannel.getOwnedShards())));
64 int selectShard(UUID chatRoomId)
66 byte[] serializedKey = chatRoomId.toString().getBytes();
67 return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;