+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHome implements ChatHome
+{
+ private final ShardingStrategy shardingStrategy;
+ private final ChatMessageChannel chatMessageChanel;
+
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ int shard = shardingStrategy.selectShard(id);
+ if (chatMessageChanel.isLoadInProgress())
+ {
+ throw new LoadInProgressException(shard);
+ }
+ else
+ {
+ return chatMessageChanel.getChatRoom(shard, id);
+ }
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ if (chatMessageChanel.isLoadInProgress())
+ {
+ throw new LoadInProgressException();
+ }
+ else
+ {
+ return chatMessageChanel.getChatRooms();
+ }
+ }
+}