]> juplo.de Git - demos/kafka/chat/commitdiff
test: Refined `KafkaConfigurationIT` (info-topic is really loaded)
authorKai Moritz <kai@juplo.de>
Sun, 24 Mar 2024 10:50:37 +0000 (11:50 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 24 Mar 2024 19:28:44 +0000 (20:28 +0100)
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index 1fa767fa449e4f502706af46f95f73306b0acae0..eee20e5e42f81837313ccf0d6a7f38a563aa05b5 100644 (file)
@@ -1,21 +1,15 @@
 package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
-import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
 import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
-import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
@@ -45,20 +39,15 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   @BeforeAll
   static void sendAndLoadStoredData(
       @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
       @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
   {
-    KafkaTestUtils.sendAndLoadStoredData(
-        messageTemplate,
+    KafkaTestUtils.initKafkaSetup(
         INFO_TOPIC,
-        DATA_TOPIC);
-
-    // The initialization of the data-channel must happen,
-    // after the messages were sent into the topic of the
-    // test-cluster.
-    // Otherwise, the initial loading of the data might be
-    // completed, before these messages arrive, so that
-    // they are ignored and the state is never restored.
-    dataChannelTaskExecutor.executeChannelTask();
+        DATA_TOPIC,
+        messageTemplate,
+        infoChannelTaskExecutor,
+        dataChannelTaskExecutor);
   }
 
 
@@ -66,25 +55,5 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   @Import(KafkaTestUtils.KafkaTestConfiguration.class)
   static class KafkaConfigurationITConfiguration
   {
-    /**
-     * The definition of this bean has to be overruled, so
-     * that the configuration of the `initMethod`, which
-     * has to be called explicitly, _after_ the messages
-     * were sent to and received by the test-culster, can
-     * be dropped.
-     */
-    @Bean(destroyMethod = "join")
-    ChannelTaskExecutor dataChannelTaskExecutor(
-        ThreadPoolTaskExecutor taskExecutor,
-        DataChannel dataChannel,
-        Consumer<String, AbstractMessageTo> dataChannelConsumer,
-        WorkAssignor dataChannelWorkAssignor)
-    {
-      return new ChannelTaskExecutor(
-          taskExecutor,
-          dataChannel,
-          dataChannelConsumer,
-          dataChannelWorkAssignor);
-    }
   }
 }
index 1c089c3f3d5d3c695340a24ce78cb0bf9dcc701d..1fec5266e6009cc6cd7edf84f7ed9cbc9be2f944 100644 (file)
@@ -2,15 +2,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
 import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.TestPropertySource;
 
+import java.util.List;
+
 import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
 import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
 import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
@@ -42,11 +47,34 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @BeforeAll
   static void sendAndLoadStoredData(
-      @Autowired KafkaTemplate<String, String> messageTemplate)
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
+      @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
   {
-    KafkaTestUtils.sendAndLoadStoredData(
-        messageTemplate,
+    KafkaTestUtils.initKafkaSetup(
         INFO_TOPIC,
-        DATA_TOPIC);
+        DATA_TOPIC,
+        messageTemplate,
+        infoChannelTaskExecutor,
+        dataChannelTaskExecutor);
+  }
+
+
+  @TestConfiguration
+  static class KafkaChatHomeServiceTestConfiguration
+  {
+    @Bean
+    WorkAssignor infoChannelWorkAssignor()
+    {
+      return consumer ->
+      {
+        List<TopicPartition> partitions = consumer
+            .partitionsFor(INFO_TOPIC)
+            .stream()
+            .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition()))
+            .toList();
+        consumer.assign(partitions);
+      };
+    }
   }
 }
index c9163976b068aee1fa5800452346a55a05da1705..fa22b3623168d90c29872c501ca3102ba90605e3 100644 (file)
@@ -2,12 +2,15 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import reactor.core.publisher.Mono;
 
 import java.util.List;
@@ -16,16 +19,38 @@ import java.util.List;
 @Slf4j
 public abstract class KafkaTestUtils
 {
+  public static void initKafkaSetup(
+      String infoTopic,
+      String dataTopic,
+      KafkaTemplate<String, String> messageTemplate,
+      ChannelTaskExecutor infoChannelTaskExecutor,
+      ChannelTaskExecutor dataChannelTaskExecutor)
+  {
+    KafkaTestUtils.sendAndLoadStoredData(
+        messageTemplate,
+        infoTopic,
+        dataTopic);
+
+    // The initialization of the channels must happen,
+    // after the messages were sent into the topics of the
+    // test-cluster.
+    // Otherwise, the initial loading of the data might be
+    // completed, before these messages arrive, so that
+    // they are ignored and the state is never restored.
+    infoChannelTaskExecutor.executeChannelTask();
+    dataChannelTaskExecutor.executeChannelTask();
+  }
+
   public static class KafkaTestConfiguration
   {
     @Bean
-    public ShardingPublisherStrategy shardingPublisherStrategy()
+    ShardingPublisherStrategy shardingPublisherStrategy()
     {
       return shard -> Mono.just("MOCKED!");
     }
 
     @Bean
-    public WorkAssignor dataChannelWorkAssignor(
+    WorkAssignor dataChannelWorkAssignor(
         ChatBackendProperties properties,
         DataChannel dataChannel)
     {
@@ -37,10 +62,52 @@ public abstract class KafkaTestUtils
         dataChannel.onPartitionsAssigned(assignedPartitions);
       };
     }
+
+    /**
+     * The definition of this bean has to be overruled, so
+     * that the configuration of the `initMethod`, which
+     * has to be called explicitly, _after_ the messages
+     * were sent to and received by the test-culster, can
+     * be dropped.
+     */
+    @Bean(destroyMethod = "join")
+    ChannelTaskExecutor infoChannelTaskExecutor(
+        ThreadPoolTaskExecutor taskExecutor,
+        InfoChannel infoChannel,
+        Consumer<String, AbstractMessageTo> infoChannelConsumer,
+        WorkAssignor infoChannelWorkAssignor)
+    {
+      return new ChannelTaskExecutor(
+          taskExecutor,
+          infoChannel,
+          infoChannelConsumer,
+          infoChannelWorkAssignor);
+    }
+
+    /**
+     * The definition of this bean has to be overruled, so
+     * that the configuration of the `initMethod`, which
+     * has to be called explicitly, _after_ the messages
+     * were sent to and received by the test-culster, can
+     * be dropped.
+     */
+    @Bean(destroyMethod = "join")
+    ChannelTaskExecutor dataChannelTaskExecutor(
+        ThreadPoolTaskExecutor taskExecutor,
+        DataChannel dataChannel,
+        Consumer<String, AbstractMessageTo> dataChannelConsumer,
+        WorkAssignor dataChannelWorkAssignor)
+    {
+      return new ChannelTaskExecutor(
+          taskExecutor,
+          dataChannel,
+          dataChannelConsumer,
+          dataChannelWorkAssignor);
+    }
   }
 
 
-  public static void sendAndLoadStoredData(
+  private static void sendAndLoadStoredData(
       KafkaTemplate<String, String> messageTemplate,
       String infoTopic,
       String dataTopic)