From: Kai Moritz Date: Sun, 24 Mar 2024 10:50:37 +0000 (+0100) Subject: test: Refined `KafkaConfigurationIT` (info-topic is really loaded) X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=dee0476f50282115c32a0ba163aa140a4f9fc3f7;p=demos%2Fkafka%2Fchat test: Refined `KafkaConfigurationIT` (info-topic is really loaded) --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index 1fa767fa..eee20e5e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,21 +1,15 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor; -import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils; -import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC; import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; @@ -45,20 +39,15 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @BeforeAll static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, + @Autowired ChannelTaskExecutor infoChannelTaskExecutor, @Autowired ChannelTaskExecutor dataChannelTaskExecutor) { - KafkaTestUtils.sendAndLoadStoredData( - messageTemplate, + KafkaTestUtils.initKafkaSetup( INFO_TOPIC, - DATA_TOPIC); - - // The initialization of the data-channel must happen, - // after the messages were sent into the topic of the - // test-cluster. - // Otherwise, the initial loading of the data might be - // completed, before these messages arrive, so that - // they are ignored and the state is never restored. - dataChannelTaskExecutor.executeChannelTask(); + DATA_TOPIC, + messageTemplate, + infoChannelTaskExecutor, + dataChannelTaskExecutor); } @@ -66,25 +55,5 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @Import(KafkaTestUtils.KafkaTestConfiguration.class) static class KafkaConfigurationITConfiguration { - /** - * The definition of this bean has to be overruled, so - * that the configuration of the `initMethod`, which - * has to be called explicitly, _after_ the messages - * were sent to and received by the test-culster, can - * be dropped. - */ - @Bean(destroyMethod = "join") - ChannelTaskExecutor dataChannelTaskExecutor( - ThreadPoolTaskExecutor taskExecutor, - DataChannel dataChannel, - Consumer dataChannelConsumer, - WorkAssignor dataChannelWorkAssignor) - { - return new ChannelTaskExecutor( - taskExecutor, - dataChannel, - dataChannelConsumer, - dataChannelWorkAssignor); - } } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 1c089c3f..1fec5266 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -2,15 +2,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; +import java.util.List; + import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS; import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC; import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC; @@ -42,11 +47,34 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll static void sendAndLoadStoredData( - @Autowired KafkaTemplate messageTemplate) + @Autowired KafkaTemplate messageTemplate, + @Autowired ChannelTaskExecutor infoChannelTaskExecutor, + @Autowired ChannelTaskExecutor dataChannelTaskExecutor) { - KafkaTestUtils.sendAndLoadStoredData( - messageTemplate, + KafkaTestUtils.initKafkaSetup( INFO_TOPIC, - DATA_TOPIC); + DATA_TOPIC, + messageTemplate, + infoChannelTaskExecutor, + dataChannelTaskExecutor); + } + + + @TestConfiguration + static class KafkaChatHomeServiceTestConfiguration + { + @Bean + WorkAssignor infoChannelWorkAssignor() + { + return consumer -> + { + List partitions = consumer + .partitionsFor(INFO_TOPIC) + .stream() + .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition())) + .toList(); + consumer.assign(partitions); + }; + } } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index c9163976..fa22b362 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -2,12 +2,15 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import reactor.core.publisher.Mono; import java.util.List; @@ -16,16 +19,38 @@ import java.util.List; @Slf4j public abstract class KafkaTestUtils { + public static void initKafkaSetup( + String infoTopic, + String dataTopic, + KafkaTemplate messageTemplate, + ChannelTaskExecutor infoChannelTaskExecutor, + ChannelTaskExecutor dataChannelTaskExecutor) + { + KafkaTestUtils.sendAndLoadStoredData( + messageTemplate, + infoTopic, + dataTopic); + + // The initialization of the channels must happen, + // after the messages were sent into the topics of the + // test-cluster. + // Otherwise, the initial loading of the data might be + // completed, before these messages arrive, so that + // they are ignored and the state is never restored. + infoChannelTaskExecutor.executeChannelTask(); + dataChannelTaskExecutor.executeChannelTask(); + } + public static class KafkaTestConfiguration { @Bean - public ShardingPublisherStrategy shardingPublisherStrategy() + ShardingPublisherStrategy shardingPublisherStrategy() { return shard -> Mono.just("MOCKED!"); } @Bean - public WorkAssignor dataChannelWorkAssignor( + WorkAssignor dataChannelWorkAssignor( ChatBackendProperties properties, DataChannel dataChannel) { @@ -37,10 +62,52 @@ public abstract class KafkaTestUtils dataChannel.onPartitionsAssigned(assignedPartitions); }; } + + /** + * The definition of this bean has to be overruled, so + * that the configuration of the `initMethod`, which + * has to be called explicitly, _after_ the messages + * were sent to and received by the test-culster, can + * be dropped. + */ + @Bean(destroyMethod = "join") + ChannelTaskExecutor infoChannelTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + InfoChannel infoChannel, + Consumer infoChannelConsumer, + WorkAssignor infoChannelWorkAssignor) + { + return new ChannelTaskExecutor( + taskExecutor, + infoChannel, + infoChannelConsumer, + infoChannelWorkAssignor); + } + + /** + * The definition of this bean has to be overruled, so + * that the configuration of the `initMethod`, which + * has to be called explicitly, _after_ the messages + * were sent to and received by the test-culster, can + * be dropped. + */ + @Bean(destroyMethod = "join") + ChannelTaskExecutor dataChannelTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + DataChannel dataChannel, + Consumer dataChannelConsumer, + WorkAssignor dataChannelWorkAssignor) + { + return new ChannelTaskExecutor( + taskExecutor, + dataChannel, + dataChannelConsumer, + dataChannelWorkAssignor); + } } - public static void sendAndLoadStoredData( + private static void sendAndLoadStoredData( KafkaTemplate messageTemplate, String infoTopic, String dataTopic)