test: Added tests for a chat-home (without shards)
authorKai Moritz <kai@juplo.de>
Tue, 22 Aug 2023 15:33:36 +0000 (17:33 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 14:16:22 +0000 (15:16 +0100)
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java [new file with mode: 0644]

index 961275e..ea30441 100644 (file)
@@ -36,7 +36,7 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
 @Slf4j
 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 {
-  final static String TOPIC = "TEST_CHATROOM_CHANNEL";
+  final static String TOPIC = "KAFKA_CONFIGURATION_IT";
 
   static CompletableFuture<Void> CONSUMER_JOB;
 
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java
new file mode 100644 (file)
index 0000000..2a995ef
--- /dev/null
@@ -0,0 +1,66 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+@ExtendWith(SpringExtension.class)
+public abstract class ChatHomeTest
+{
+  @Autowired
+  ChatHome chatHome;
+
+
+  @Test
+  @DisplayName("Assert chatroom is delivered, if it exists")
+  void testGetExistingChatroom()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+
+    // When
+    Mono<ChatRoom> mono = Mono
+        .defer(() -> chatHome.getChatRoom(chatRoomId))
+        .log("testGetExistingChatroom")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException));
+
+    // Then
+    assertThat(mono).emitsCount(1);
+  }
+
+  @Test
+  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+  void testGetNonExistentChatroom()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
+
+    // When
+    Mono<ChatRoom> mono = Mono
+        .defer(() -> chatHome.getChatRoom(chatRoomId))
+        .log("testGetNonExistentChatroom")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException));
+
+    // Then
+    assertThat(mono).sendsError(e ->
+    {
+      assertThat(e).isInstanceOf(UnknownChatroomException.class);
+      UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
+      assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+    });
+  }
+}
index e45f1a5..761e700 100644 (file)
@@ -1,59 +1,46 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
 import java.time.Clock;
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
 
 
-public class SimpleChatHomeTest
+public class SimpleChatHomeTest extends ChatHomeTest
 {
-  @Test
-  @DisplayName("Assert chatroom is delivered, if it exists")
-  void testGetExistingChatroom()
+  @TestConfiguration
+  static class Configuration
   {
-    // Given
-    InMemoryChatHomeService chatHomeService = mock(InMemoryChatHomeService.class);
-    ChatRoom chatRoom = new ChatRoom(
-        UUID.randomUUID(),
-        "Foo",
-        0,
-        Clock.systemDefaultZone(),
-        mock(ChatRoomService.class),
-        8);
-    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
-    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
-    // When
-    Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
-
-    // Then
-    assertThat(mono).emitsExactly(chatRoom);
-  }
-
-  @Test
-  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
-  void testGetNonExistentChatroom()
-  {
-    // Given
-    InMemoryChatHomeService chatHomeService = mock(InMemoryChatHomeService.class);
-    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
-    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
-    // When
-    Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
-
-    // Then
-    assertThat(mono).sendsError();
+    @Bean
+    SimpleChatHome chatHome(InMemoryChatHomeService chatHomeService)
+    {
+      return new SimpleChatHome(chatHomeService);
+    }
+
+    @Bean
+    InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy)
+    {
+      return new InMemoryChatHomeService(
+          1,
+          new int[] { 0 },
+          storageStrategy.read());
+    }
+
+    @Bean
+    public FilesStorageStrategy storageStrategy()
+    {
+      return new FilesStorageStrategy(
+          Paths.get("target", "test-classes", "data", "files"),
+          Clock.systemDefaultZone(),
+          8,
+          chatRoomId -> 0,
+          messageFlux -> new InMemoryChatRoomService(messageFlux),
+          new ObjectMapper());
+    }
   }
 }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java
new file mode 100644 (file)
index 0000000..4fd8372
--- /dev/null
@@ -0,0 +1,112 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
+
+
+@SpringBootTest(
+    classes = {
+        KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class,
+        KafkaServicesConfiguration.class,
+        KafkaAutoConfiguration.class,
+        TaskExecutionAutoConfiguration.class,
+    },
+    properties = {
+    "chat.backend.services=kafka",
+    "chat.backend.kafka.client-id-PREFIX=TEST",
+    "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+    "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+    "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+    "chat.backend.kafka.num-partitions=10",
+})
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@Slf4j
+public class KafkaChatHomeTest extends ChatHomeTest
+{
+  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+
+  static CompletableFuture<Void> CONSUMER_JOB;
+
+
+  @TestConfiguration
+  @EnableConfigurationProperties(ChatBackendProperties.class)
+  static class KafkaChatHomeTestConfiguration
+  {
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+
+
+  @BeforeAll
+  public static void sendAndLoadStoredData(
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired Consumer chatRoomChannelConsumer,
+      @Autowired ThreadPoolTaskExecutor taskExecutor,
+      @Autowired ChatRoomChannel chatRoomChannel)
+  {
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+    chatRoomChannelConsumer.assign(assignedPartitions);
+    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+    CONSUMER_JOB = taskExecutor
+        .submitCompletable(chatRoomChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          return null;
+        });
+  }
+
+  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  {
+    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    record.headers().add("__TypeId__", typeId.getBytes());
+    SendResult<String, String> result = kafkaTemplate.send(record).join();
+    log.info(
+        "Sent {}={} to {}",
+        key,
+        value,
+        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
+  }
+
+  @AfterAll
+  static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+  {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatRoomChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    CONSUMER_JOB.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
+  }
+}