From 46ba1885fe43ef5f0f9b845f22cd61d707df94cc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 14 Sep 2023 23:42:59 +0200 Subject: [PATCH] refactor: Introduced `ConsumerTaskExecutor` -- Aligned code * Moved startup-logic in a separate class (`ConsumerTaskExecutor`), so that it is reusable more easily in test scenarios. * `KafkaServicesApplicationRunner` is instanciated via `@Component`, so that it is _not_ instanciated automatically, if the configuration is triggered explicitly via `KafkaServicesConfiguration` in test scenarios. --- .../kafka/ConsumerTaskExecutor.java | 41 +++++++----------- .../kafka/KafkaServicesApplicationRunner.java | 42 ++----------------- .../kafka/KafkaServicesConfiguration.java | 8 ++-- .../kafka/KafkaChatHomeServiceTest.java | 13 +++--- 4 files changed, 32 insertions(+), 72 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java index b635dfc1..9ebc26b2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java @@ -5,53 +5,44 @@ import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import java.util.List; import java.util.concurrent.CompletableFuture; -@ConditionalOnProperty( - prefix = "chat.backend", - name = "services", - havingValue = "kafka") @RequiredArgsConstructor @Slf4j -public class KafkaServicesApplicationRunner implements ApplicationRunner +public class ConsumerTaskExecutor { private final ThreadPoolTaskExecutor taskExecutor; - private final ChatRoomChannel chatRoomChannel; - private final Consumer chatRoomChannelConsumer; + private final Runnable consumerTask; + private final Consumer consumer; private final WorkAssignor workAssignor; - CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture consumerTaskJob; - @Override - public void run(ApplicationArguments args) throws Exception + public void executeConsumerTask() { - workAssignor.assignWork(chatRoomChannelConsumer); - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) + workAssignor.assignWork(consumer); + log.info("Starting the consumer-task for {}", consumerTask); + consumerTaskJob = taskExecutor + .submitCompletable(consumerTask) .exceptionally(e -> { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + log.error("The consumer-task for {} exited abnormally!", consumerTask, e); return null; }); } @PreDestroy - public void joinChatRoomChannelConsumerJob() + public void joinConsumerTaskJob() { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); + log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + consumer.wakeup(); + log.info("Waiting for the consumer of {} to finish its work", consumerTask); + consumerTaskJob.join(); + log.info("Joined the consumer-task for {}", consumerTask); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index b635dfc1..da7ee75d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -1,62 +1,28 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.List; -import java.util.concurrent.CompletableFuture; +import org.springframework.stereotype.Component; @ConditionalOnProperty( prefix = "chat.backend", name = "services", havingValue = "kafka") +@Component @RequiredArgsConstructor @Slf4j public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final ThreadPoolTaskExecutor taskExecutor; - private final ChatRoomChannel chatRoomChannel; - private final Consumer chatRoomChannelConsumer; - private final WorkAssignor workAssignor; - - CompletableFuture chatRoomChannelConsumerJob; + private final ConsumerTaskExecutor chatRoomChannelTaskExecutor; @Override public void run(ApplicationArguments args) throws Exception { - workAssignor.assignWork(chatRoomChannelConsumer); - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); - } - - @PreDestroy - public void joinChatRoomChannelConsumerJob() - { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } - - - interface WorkAssignor - { - void assignWork(Consumer consumer); + chatRoomChannelTaskExecutor.executeConsumerTask(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index d17c79c9..cda0b94f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -36,13 +36,13 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - KafkaServicesApplicationRunner kafkaServicesApplicationRunner( + ConsumerTaskExecutor chatRoomChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, ChatRoomChannel chatRoomChannel, Consumer chatRoomChannelConsumer, - KafkaServicesApplicationRunner.WorkAssignor workAssignor) + ConsumerTaskExecutor.WorkAssignor workAssignor) { - return new KafkaServicesApplicationRunner( + return new ConsumerTaskExecutor( taskExecutor, chatRoomChannel, chatRoomChannelConsumer, @@ -50,7 +50,7 @@ public class KafkaServicesConfiguration } @Bean - KafkaServicesApplicationRunner.WorkAssignor workAssignor( + ConsumerTaskExecutor.WorkAssignor workAssignor( ChatBackendProperties properties, ChatRoomChannel chatRoomChannel) { diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 204b34e6..4aa362d7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.DefaultApplicationArguments; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -55,7 +54,7 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest static class KafkaChatHomeTestConfiguration { @Bean - KafkaServicesApplicationRunner.WorkAssignor workAssignor( + ConsumerTaskExecutor.WorkAssignor workAssignor( ChatRoomChannel chatRoomChannel) { return consumer -> @@ -76,13 +75,17 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll - public static void sendAndLoadStoredData(@Autowired KafkaTemplate messageTemplate) + public static void sendAndLoadStoredData( + @Autowired ConsumerTaskExecutor consumerTaskExecutor, + @Autowired KafkaTemplate messageTemplate) { send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + + consumerTaskExecutor.executeConsumerTask(); } static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) @@ -98,8 +101,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest } @AfterAll - static void joinConsumerJob(@Autowired KafkaServicesApplicationRunner applicationRunner) + static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor) { - applicationRunner.joinChatRoomChannelConsumerJob(); + consumerTaskExecutor.joinConsumerTaskJob(); } } -- 2.20.1