X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FKafkaConfigurationIT.java;h=1fa767fa449e4f502706af46f95f73306b0acae0;hb=0e5178b7a3b8c8f3167f99c2d36452a2a68c80fa;hp=e01e012d1ba2dd7d8f26fbccafde5a7959bccfd5;hpb=f604c5ad4ce13cc7ca90816a0ed58b4de4caeec6;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index e01e012d..1fa767fa 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,16 +1,21 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.kafka.*; +import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor; +import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; +import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils; +import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; +import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC; import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; @@ -37,27 +42,23 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL"; final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL"; - @MockBean - KafkaServicesApplicationRunner kafkaServicesApplicationRunner; - @BeforeAll - public static void sendAndLoadStoredData( + static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ConsumerTaskRunner consumerTaskRunner) + @Autowired ChannelTaskExecutor dataChannelTaskExecutor) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, - DATA_TOPIC, - consumerTaskRunner); - } + DATA_TOPIC); - @AfterAll - static void joinConsumerTasks( - @Autowired ConsumerTaskRunner consumerTaskRunner) - throws InterruptedException - { - KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); + // The initialization of the data-channel must happen, + // after the messages were sent into the topic of the + // test-cluster. + // Otherwise, the initial loading of the data might be + // completed, before these messages arrive, so that + // they are ignored and the state is never restored. + dataChannelTaskExecutor.executeChannelTask(); } @@ -65,5 +66,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @Import(KafkaTestUtils.KafkaTestConfiguration.class) static class KafkaConfigurationITConfiguration { + /** + * The definition of this bean has to be overruled, so + * that the configuration of the `initMethod`, which + * has to be called explicitly, _after_ the messages + * were sent to and received by the test-culster, can + * be dropped. + */ + @Bean(destroyMethod = "join") + ChannelTaskExecutor dataChannelTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + DataChannel dataChannel, + Consumer dataChannelConsumer, + WorkAssignor dataChannelWorkAssignor) + { + return new ChannelTaskExecutor( + taskExecutor, + dataChannel, + dataChannelConsumer, + dataChannelWorkAssignor); + } } }