From: Kai Moritz Date: Tue, 22 Aug 2023 15:33:36 +0000 (+0200) Subject: TEST:sharded X-Git-Tag: rebase--2023-09-02--10-22~15 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=89c8e3676c4325b4fb6340be97f322412abd4201;p=demos%2Fkafka%2Fchat TEST:sharded --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java new file mode 100644 index 00000000..a417e6f9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java @@ -0,0 +1,32 @@ +package de.juplo.kafka.chat.backend.domain; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.util.UUID; + +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +public class ChatHomeWithShardsTestBase extends ChatHomeTestBase +{ + @Test + @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned") + void testGetChatroomForNotOwnedShard() + { + // Given + UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19"); + + // When + Mono mono = chatHome.getChatRoom(chatRoomId); + + // Then + assertThat(mono).sendsError(e -> + { + assertThat(e).isInstanceOf(UnknownChatroomException.class); + UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e; + assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId); + }); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java new file mode 100644 index 00000000..ff82986a --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java @@ -0,0 +1,73 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import java.nio.file.Paths; +import java.time.Clock; +import java.util.stream.IntStream; + + +public class ShardedChatHomeTest extends ChatHomeWithShardsTestBase +{ + @TestConfiguration + static class Configuration + { + @Bean + ShardedChatHome chatHome( + Integer numShards, + int[] ownedShards, + InMemoryChatHomeService chatHomeService) + { + SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; + + IntStream + .of(ownedShards) + .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard)); + + ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + + return new ShardedChatHome(chatHomes, strategy); + } + + @Bean + InMemoryChatHomeService chatHomeService( + Integer numShards, + int[] ownedShards, + StorageStrategy storageStrategy) + { + return new InMemoryChatHomeService( + numShards, + ownedShards, + storageStrategy.read()); + } + + @Bean + public FilesStorageStrategy storageStrategy(Integer numShards) + { + return new FilesStorageStrategy( + Paths.get("target", "test-classes", "data", "files"), + Clock.systemDefaultZone(), + 8, + new KafkaLikeShardingStrategy(numShards), + messageFlux -> new InMemoryChatRoomService(messageFlux), + new ObjectMapper()); + } + + @Bean + Integer numShards() + { + return 10; + } + + @Bean + int[] ownedShards() + { + return new int[] { 2 }; + } + } +}