TEST:sharded
authorKai Moritz <kai@juplo.de>
Tue, 22 Aug 2023 15:33:36 +0000 (17:33 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 29 Aug 2023 17:34:45 +0000 (19:34 +0200)
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java [new file with mode: 0644]

diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTestBase.java
new file mode 100644 (file)
index 0000000..a417e6f
--- /dev/null
@@ -0,0 +1,32 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public class ChatHomeWithShardsTestBase extends ChatHomeTestBase
+{
+  @Test
+  @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+  void testGetChatroomForNotOwnedShard()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+    // When
+    Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoomId);
+
+    // Then
+    assertThat(mono).sendsError(e ->
+    {
+      assertThat(e).isInstanceOf(UnknownChatroomException.class);
+      UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
+      assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+    });
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java
new file mode 100644 (file)
index 0000000..ff82986
--- /dev/null
@@ -0,0 +1,73 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTestBase
+{
+  @TestConfiguration
+  static class Configuration
+  {
+    @Bean
+    ShardedChatHome chatHome(
+        Integer numShards,
+        int[] ownedShards,
+        InMemoryChatHomeService chatHomeService)
+    {
+      SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+
+      IntStream
+          .of(ownedShards)
+          .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
+
+      ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+
+      return new ShardedChatHome(chatHomes, strategy);
+    }
+
+    @Bean
+    InMemoryChatHomeService chatHomeService(
+        Integer numShards,
+        int[] ownedShards,
+        StorageStrategy storageStrategy)
+    {
+      return new InMemoryChatHomeService(
+          numShards,
+          ownedShards,
+          storageStrategy.read());
+    }
+
+    @Bean
+    public FilesStorageStrategy storageStrategy(Integer numShards)
+    {
+      return new FilesStorageStrategy(
+          Paths.get("target", "test-classes", "data", "files"),
+          Clock.systemDefaultZone(),
+          8,
+          new KafkaLikeShardingStrategy(numShards),
+          messageFlux -> new InMemoryChatRoomService(messageFlux),
+          new ObjectMapper());
+    }
+
+    @Bean
+    Integer numShards()
+    {
+      return 10;
+    }
+
+    @Bean
+    int[] ownedShards()
+    {
+      return new int[] { 2 };
+    }
+  }
+}