From 889b715d4c315d4ce8336c0c2909cee20890126b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 6 Mar 2024 15:34:20 +0100 Subject: [PATCH] refactor: Simplified the configuration for the kafka-services * Removed class `ChannelTaskRunner` and `KafkaServicesApplicationRunner`. * Instead, the method `ChannelTaskExecutor.excuteChannelTask()` is executed as `@Bean.initMethod` by Spring. * Adapted the test-cases accordingly: ** Joinig the channel-tasks is not necessary any more, because that is done by the imported production-config ** `KafkaConfigurationIT` has to call `executeChannelTasks()` explicitly ** Therefore, it has to overrule the default-config for the bean `dataChannelTaskExecutor` in order to drop the configuration of the `initMethod`. ** Otherwise, the test would (might) not restore the data from the topic, because the messages, that are send into the test-cluster, might arrive only after the initial loading of the data is done. --- .../kafka/ChannelTaskRunner.java | 19 ------- .../kafka/KafkaServicesApplicationRunner.java | 29 ---------- .../kafka/KafkaServicesConfiguration.java | 14 +---- .../chat/backend/KafkaConfigurationIT.java | 56 +++++++++++++------ .../kafka/KafkaChatHomeServiceTest.java | 16 +----- .../implementation/kafka/KafkaTestUtils.java | 5 +- 6 files changed, 43 insertions(+), 96 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java deleted file mode 100644 index d329ac6e..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - - -@RequiredArgsConstructor -@Slf4j -public class ChannelTaskRunner -{ - private final ChannelTaskExecutor infoChannelTaskExecutor; - private final ChannelTaskExecutor dataChannelTaskExecutor; - - public void executeChannels() - { - infoChannelTaskExecutor.executeChannelTask(); - dataChannelTaskExecutor.executeChannelTask(); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java deleted file mode 100644 index 16b47414..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - - -@ConditionalOnProperty( - prefix = "chat.backend", - name = "services", - havingValue = "kafka") -@Component -@RequiredArgsConstructor -@Slf4j -public class KafkaServicesApplicationRunner implements ApplicationRunner -{ - private final ChannelTaskRunner channelTaskRunner; - - - @Override - public void run(ApplicationArguments args) - { - log.info("Executing channel-tasks"); - channelTaskRunner.executeChannels(); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index b28b6903..bebed14b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -44,17 +44,7 @@ public class KafkaServicesConfiguration return new KafkaServicesThreadPoolTaskExecutorCustomizer(); } - @Bean - ChannelTaskRunner channelTaskRunner( - ChannelTaskExecutor infoChannelTaskExecutor, - ChannelTaskExecutor dataChannelTaskExecutor) - { - return new ChannelTaskRunner( - infoChannelTaskExecutor, - dataChannelTaskExecutor); - } - - @Bean(destroyMethod = "join") + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") ChannelTaskExecutor infoChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, @@ -84,7 +74,7 @@ public class KafkaServicesConfiguration }; } - @Bean(destroyMethod = "join") + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") ChannelTaskExecutor dataChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index 75097dcb..1fa767fa 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,16 +1,21 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.kafka.*; +import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor; +import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; +import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils; +import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; +import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC; import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC; @@ -37,28 +42,23 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL"; final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL"; - @MockBean - KafkaServicesApplicationRunner kafkaServicesApplicationRunner; - @BeforeAll - public static void sendAndLoadStoredData( + static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ChannelTaskRunner channelTaskRunner) + @Autowired ChannelTaskExecutor dataChannelTaskExecutor) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, - DATA_TOPIC, - channelTaskRunner); - } + DATA_TOPIC); - @AfterAll - static void joinChannels( - @Autowired ChannelTaskExecutor dataChannelTaskExecutor, - @Autowired ChannelTaskExecutor infoChannelTaskExecutor) - { - dataChannelTaskExecutor.join(); - infoChannelTaskExecutor.join(); + // The initialization of the data-channel must happen, + // after the messages were sent into the topic of the + // test-cluster. + // Otherwise, the initial loading of the data might be + // completed, before these messages arrive, so that + // they are ignored and the state is never restored. + dataChannelTaskExecutor.executeChannelTask(); } @@ -66,5 +66,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @Import(KafkaTestUtils.KafkaTestConfiguration.class) static class KafkaConfigurationITConfiguration { + /** + * The definition of this bean has to be overruled, so + * that the configuration of the `initMethod`, which + * has to be called explicitly, _after_ the messages + * were sent to and received by the test-culster, can + * be dropped. + */ + @Bean(destroyMethod = "join") + ChannelTaskExecutor dataChannelTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + DataChannel dataChannel, + Consumer dataChannelConsumer, + WorkAssignor dataChannelWorkAssignor) + { + return new ChannelTaskExecutor( + taskExecutor, + dataChannel, + dataChannelConsumer, + dataChannelWorkAssignor); + } } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 72422c89..1c089c3f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; @@ -43,22 +42,11 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll static void sendAndLoadStoredData( - @Autowired KafkaTemplate messageTemplate, - @Autowired ChannelTaskRunner channelTaskRunner) + @Autowired KafkaTemplate messageTemplate) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, - DATA_TOPIC, - channelTaskRunner); - } - - @AfterAll - static void joinChannels( - @Autowired ChannelTaskExecutor dataChannelTaskExecutor, - @Autowired ChannelTaskExecutor infoChannelTaskExecutor) - { - dataChannelTaskExecutor.join(); - infoChannelTaskExecutor.join(); + DATA_TOPIC); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index 6ea4772e..c9163976 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -43,16 +43,13 @@ public abstract class KafkaTestUtils public static void sendAndLoadStoredData( KafkaTemplate messageTemplate, String infoTopic, - String dataTopic, - ChannelTaskRunner channelTaskRunner) + String dataTopic) { send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - - channelTaskRunner.executeChannels(); } private static void send( -- 2.20.1