X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatRoomChannel.java;h=234554ebbdd267ae1adefbc90d95190b7c99b767;hb=a39837c0ddf444dd98b371eaf8226ad865543519;hp=7f93ad8bf78af98f35bae6ed83f6d7f4657aeb2a;hpb=e7af512057440075a779ff5a5401dd11fc962741;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 7f93ad8b..234554eb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -1,9 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; @@ -360,16 +357,24 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } + int[] getOwnedShards() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .toArray(); + } + Mono getChatRoom(int shard, UUID id) { if (loadInProgress) { - throw new LoadInProgressException(shard); + return Mono.error(new LoadInProgressException()); } if (!isShardOwned[shard]) { - throw new ShardNotOwnedException(shard); + return Mono.error(new ShardNotOwnedException(shard)); } return Mono.justOrEmpty(chatrooms[shard].get(id));