X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fdomain%2FShardedChatHome.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fdomain%2FShardedChatHome.java;h=ffa7860a99c327b5538b7de54a7e6d0a7abab708;hb=b4b3b9dff26d99f11d21c54ff004a73247bbc84d;hp=0000000000000000000000000000000000000000;hpb=72682edb434aa1ad6e2ce7c3b337711cdb746ef1;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java new file mode 100644 index 00000000..ffa7860a --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -0,0 +1,42 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.RequiredArgsConstructor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +@RequiredArgsConstructor +public class ShardedChatHome implements ChatHome +{ + private final ChatHome[] chatHomes; + private final ShardingStrategy selectionStrategy; + + + @Override + public Mono putChatRoom(ChatRoom chatRoom) + { + return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom); + } + + @Override + public Mono getChatRoom(UUID id) + { + return chatHomes[selectShard(id)].getChatRoom(id); + } + + @Override + public Flux getChatRooms() + { + return Flux + .fromArray(chatHomes) + .flatMap(chatHome -> chatHome.getChatRooms()); + } + + + private int selectShard(UUID chatroomId) + { + return selectionStrategy.selectShard(chatroomId); + } +}