test: Added tests for a chat-home _with_ shards
authorKai Moritz <kai@juplo.de>
Tue, 22 Aug 2023 15:33:36 +0000 (17:33 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 14:16:22 +0000 (15:16 +0100)
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java

diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java
new file mode 100644 (file)
index 0000000..eebeea1
--- /dev/null
@@ -0,0 +1,44 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public abstract class ChatHomeWithShardsTest extends ChatHomeTest
+{
+  public static final int NUM_SHARDS = 10;
+  public static final int OWNED_SHARD = 2;
+  public static final int NOT_OWNED_SHARD = 0;
+
+
+  @Test
+  @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+  void testGetChatroomForNotOwnedShard()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+    // When
+    Mono<ChatRoom> mono = Mono
+        .defer(() -> chatHome.getChatRoom(chatRoomId))
+        .log("testGetChatroomForNotOwnedShard")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException));
+
+    // Then
+    assertThat(mono).sendsError(e ->
+    {
+      assertThat(e).isInstanceOf(ShardNotOwnedException.class);
+      ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
+      assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
+    });
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java
new file mode 100644 (file)
index 0000000..2370cbe
--- /dev/null
@@ -0,0 +1,61 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTest
+{
+  @TestConfiguration
+  static class Configuration
+  {
+    @Bean
+    ShardedChatHome chatHome(
+        InMemoryChatHomeService chatHomeService)
+    {
+      SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS];
+
+      IntStream
+          .of(ownedShards())
+          .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
+
+      ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
+
+      return new ShardedChatHome(chatHomes, strategy);
+    }
+
+    @Bean
+    InMemoryChatHomeService chatHomeService(
+        StorageStrategy storageStrategy)
+    {
+      return new InMemoryChatHomeService(
+          NUM_SHARDS,
+          ownedShards(),
+          storageStrategy.read());
+    }
+
+    @Bean
+    public FilesStorageStrategy storageStrategy()
+    {
+      return new FilesStorageStrategy(
+          Paths.get("target", "test-classes", "data", "files"),
+          Clock.systemDefaultZone(),
+          8,
+          new KafkaLikeShardingStrategy(NUM_SHARDS),
+          messageFlux -> new InMemoryChatRoomService(messageFlux),
+          new ObjectMapper());
+    }
+
+    int[] ownedShards()
+    {
+      return new int[] { OWNED_SHARD };
+    }
+  }
+}
index 4fd8372..d758a22 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -24,6 +24,7 @@ import java.time.Clock;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS;
 import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
 
 
@@ -40,11 +41,11 @@ import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TO
     "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
     "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
     "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
-    "chat.backend.kafka.num-partitions=10",
+    "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
 })
 @EmbeddedKafka(topics = { TOPIC }, partitions = 10)
 @Slf4j
-public class KafkaChatHomeTest extends ChatHomeTest
+public class KafkaChatHomeTest extends ChatHomeWithShardsTest
 {
   final static String TOPIC = "KAFKA_CHAT_HOME_TEST";