From c34a596c196eacf592e43218d5f1396556cf5477 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 22 Aug 2023 17:33:36 +0200 Subject: [PATCH] test: Added tests for a chat-home _with_ shards --- .../domain/ChatHomeWithShardsTest.java | 44 +++++++++++++ .../inmemory/ShardedChatHomeTest.java | 61 +++++++++++++++++++ .../persistence/kafka/KafkaChatHomeTest.java | 7 ++- 3 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java new file mode 100644 index 00000000..eebeea1e --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java @@ -0,0 +1,44 @@ +package de.juplo.kafka.chat.backend.domain; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.util.UUID; + +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +public abstract class ChatHomeWithShardsTest extends ChatHomeTest +{ + public static final int NUM_SHARDS = 10; + public static final int OWNED_SHARD = 2; + public static final int NOT_OWNED_SHARD = 0; + + + @Test + @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned") + void testGetChatroomForNotOwnedShard() + { + // Given + UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19"); + + // When + Mono mono = Mono + .defer(() -> chatHome.getChatRoom(chatRoomId)) + .log("testGetChatroomForNotOwnedShard") + .retryWhen(Retry + .backoff(5, Duration.ofSeconds(1)) + .filter(throwable -> throwable instanceof LoadInProgressException)); + + // Then + assertThat(mono).sendsError(e -> + { + assertThat(e).isInstanceOf(ShardNotOwnedException.class); + ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e; + assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD); + }); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java new file mode 100644 index 00000000..2370cbe1 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java @@ -0,0 +1,61 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import java.nio.file.Paths; +import java.time.Clock; +import java.util.stream.IntStream; + +public class ShardedChatHomeTest extends ChatHomeWithShardsTest +{ + @TestConfiguration + static class Configuration + { + @Bean + ShardedChatHome chatHome( + InMemoryChatHomeService chatHomeService) + { + SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS]; + + IntStream + .of(ownedShards()) + .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard)); + + ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); + + return new ShardedChatHome(chatHomes, strategy); + } + + @Bean + InMemoryChatHomeService chatHomeService( + StorageStrategy storageStrategy) + { + return new InMemoryChatHomeService( + NUM_SHARDS, + ownedShards(), + storageStrategy.read()); + } + + @Bean + public FilesStorageStrategy storageStrategy() + { + return new FilesStorageStrategy( + Paths.get("target", "test-classes", "data", "files"), + Clock.systemDefaultZone(), + 8, + new KafkaLikeShardingStrategy(NUM_SHARDS), + messageFlux -> new InMemoryChatRoomService(messageFlux), + new ObjectMapper()); + } + + int[] ownedShards() + { + return new int[] { OWNED_SHARD }; + } + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java index 4fd8372a..d758a22d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeTest; +import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -24,6 +24,7 @@ import java.time.Clock; import java.util.List; import java.util.concurrent.CompletableFuture; +import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS; import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC; @@ -40,11 +41,11 @@ import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TO "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, - "chat.backend.kafka.num-partitions=10", + "chat.backend.kafka.num-partitions=" + NUM_SHARDS, }) @EmbeddedKafka(topics = { TOPIC }, partitions = 10) @Slf4j -public class KafkaChatHomeTest extends ChatHomeTest +public class KafkaChatHomeTest extends ChatHomeWithShardsTest { final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; -- 2.20.1