WIP:KafkaChatHomeTest
authorKai Moritz <kai@juplo.de>
Fri, 25 Aug 2023 10:12:32 +0000 (12:12 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 25 Aug 2023 10:12:32 +0000 (12:12 +0200)
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java

index 961275e..ea30441 100644 (file)
@@ -36,7 +36,7 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
 @Slf4j
 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 {
-  final static String TOPIC = "TEST_CHATROOM_CHANNEL";
+  final static String TOPIC = "KAFKA_CONFIGURATION_IT";
 
   static CompletableFuture<Void> CONSUMER_JOB;
 
index ab24082..1298122 100644 (file)
@@ -1,12 +1,40 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
+import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
+
+
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@Slf4j
 public class KafkaChatHomeTest extends ChatHomeWithShardsTestBase
 {
+  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+
+  static CompletableFuture<Void> CONSUMER_JOB;
+
+
   @TestConfiguration
   static class Configuration
   {
@@ -16,10 +44,72 @@ public class KafkaChatHomeTest extends ChatHomeWithShardsTestBase
       return new KafkaChatHome(numShards(), chatRoomChannel);
     }
 
+    @Bean
+    ChatRoomChannel chatRoomChannel(
+        Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+        Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+        ZoneId zoneId,
+        Clock clock)
+    {
+      return new ChatRoomChannel(
+          TOPIC,
+          chatRoomChannelProducer,
+          chatRoomChannelConsumer,
+          zoneId,
+          numShards(),
+          8,
+          clock);
+    }
+
     Integer numShards()
     {
       return 10;
     }
+  }
+  @BeforeAll
+  public static void sendAndLoadStoredData(
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired Consumer chatRoomChannelConsumer,
+      @Autowired ThreadPoolTaskExecutor taskExecutor,
+      @Autowired ChatRoomChannel chatRoomChannel)
+  {
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+    chatRoomChannelConsumer.assign(assignedPartitions);
+    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+    CONSUMER_JOB = taskExecutor
+        .submitCompletable(chatRoomChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          return null;
+        });
+  }
 
+  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  {
+    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    record.headers().add("__TypeId__", typeId.getBytes());
+    SendResult<String, String> result = kafkaTemplate.send(record).join();
+    log.info(
+        "Sent {}={} to {}",
+        key,
+        value,
+        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
+  }
+
+  @AfterAll
+  static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+  {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatRoomChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    CONSUMER_JOB.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
   }
 }