WIP:test: `*ConfigurationIT` asserts, if restored messages can be seen
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaConfigurationIT.java
index d9ed8eb..1fa767f 100644 (file)
@@ -1,16 +1,21 @@
 package de.juplo.kafka.chat.backend;
 
-import de.juplo.kafka.chat.backend.implementation.kafka.*;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
+import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
+import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
+import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterAll;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
@@ -37,25 +42,23 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
   final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
 
-  @MockBean
-  KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
-
   @BeforeAll
-  public static void sendAndLoadStoredData(
+  static void sendAndLoadStoredData(
       @Autowired KafkaTemplate<String, String> messageTemplate,
-      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
   {
     KafkaTestUtils.sendAndLoadStoredData(
         messageTemplate,
         INFO_TOPIC,
-        DATA_TOPIC,
-        consumerTaskRunner);
-  }
+        DATA_TOPIC);
 
-  @AfterAll
-  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
-  {
-    KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
+    // The initialization of the data-channel must happen,
+    // after the messages were sent into the topic of the
+    // test-cluster.
+    // Otherwise, the initial loading of the data might be
+    // completed, before these messages arrive, so that
+    // they are ignored and the state is never restored.
+    dataChannelTaskExecutor.executeChannelTask();
   }
 
 
@@ -63,5 +66,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   @Import(KafkaTestUtils.KafkaTestConfiguration.class)
   static class KafkaConfigurationITConfiguration
   {
+    /**
+     * The definition of this bean has to be overruled, so
+     * that the configuration of the `initMethod`, which
+     * has to be called explicitly, _after_ the messages
+     * were sent to and received by the test-culster, can
+     * be dropped.
+     */
+    @Bean(destroyMethod = "join")
+    ChannelTaskExecutor dataChannelTaskExecutor(
+        ThreadPoolTaskExecutor taskExecutor,
+        DataChannel dataChannel,
+        Consumer<String, AbstractMessageTo> dataChannelConsumer,
+        WorkAssignor dataChannelWorkAssignor)
+    {
+      return new ChannelTaskExecutor(
+          taskExecutor,
+          dataChannel,
+          dataChannelConsumer,
+          dataChannelWorkAssignor);
+    }
   }
 }