1 package de.juplo.kafka.chat.backend.domain;
3 import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
4 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
5 import org.junit.jupiter.api.DisplayName;
6 import org.junit.jupiter.api.Test;
7 import reactor.core.publisher.Mono;
8 import reactor.util.retry.Retry;
10 import java.time.Duration;
11 import java.util.UUID;
13 import static pl.rzrz.assertj.reactor.Assertions.assertThat;
16 public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
18 public static final int NUM_SHARDS = 10;
19 public static final int OWNED_SHARD = 2;
20 public static final int NOT_OWNED_SHARD = 0;
24 @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
25 void testGetChatroomForNotOwnedShard()
28 UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
31 Mono<ChatRoomData> mono = Mono
32 .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
33 .log("testGetChatroomForNotOwnedShard")
35 .backoff(5, Duration.ofSeconds(1))
36 .filter(throwable -> throwable instanceof ChannelNotReadyException));
39 assertThat(mono).sendsError(e ->
41 assertThat(e).isInstanceOf(ShardNotOwnedException.class);
42 ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
43 assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);