67990409e9b559e712b150a41d28365e8403b92d
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHome.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHome;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.common.utils.Utils;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.util.*;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class KafkaChatHome implements ChatHome
17 {
18   private final int numPartitions;
19   private final ChatRoomChannel chatRoomChannel;
20
21
22   @Override
23   public Mono<ChatRoom> getChatRoom(UUID id)
24   {
25     int shard = selectShard(id);
26     return chatRoomChannel.getChatRoom(shard, id);
27   }
28
29   int selectShard(UUID chatRoomId)
30   {
31     byte[] serializedKey = chatRoomId.toString().getBytes();
32     return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
33   }
34
35   @Override
36   public Flux<ChatRoom> getChatRooms()
37   {
38       return chatRoomChannel.getChatRooms();
39   }
40 }