import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
int shard = selectShard(id);
return chatHomes[shard] == null
? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard].getChatRoom(id);
+ : chatHomes[shard]
+ .getChatRoom(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
}
@Override