LoadInProgressException - ALIGN
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
index 7f93ad8..f6f50cc 100644 (file)
@@ -1,9 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
@@ -360,11 +357,19 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
+  int[] getOwnedShards()
+  {
+    return IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .toArray();
+  }
+
   Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
     if (loadInProgress)
     {
-      throw new LoadInProgressException(shard);
+      throw new LoadInProgressException();
     }
 
     if (!isShardOwned[shard])