package de.juplo.kafka.chat.backend.persistence.kafka;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
}
}
+ int[] getOwnedShards()
+ {
+ return IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .toArray();
+ }
+
Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
if (loadInProgress)
{
- throw new LoadInProgressException(shard);
+ return Mono.error(new LoadInProgressException());
}
if (!isShardOwned[shard])
{
- throw new ShardNotOwnedException(shard);
+ return Mono.error(new ShardNotOwnedException(shard));
}
return Mono.justOrEmpty(chatrooms[shard].get(id));