refactor: Introduced `ConsumerTaskExecutor` -- Aligned code
authorKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 21:42:59 +0000 (23:42 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 14:16:22 +0000 (15:16 +0100)
* Moved startup-logic in a separate class (`ConsumerTaskExecutor`), so
  that it is reusable more easily in test scenarios.
* `KafkaServicesApplicationRunner` is instanciated via `@Component`, so
  that it is _not_ instanciated automatically, if the configuration is
  triggered explicitly via `KafkaServicesConfiguration` in test scenarios.

src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java

index b635dfc..9ebc26b 100644 (file)
@@ -5,53 +5,44 @@ import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 
-@ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "services",
-    havingValue = "kafka")
 @RequiredArgsConstructor
 @Slf4j
-public class KafkaServicesApplicationRunner implements ApplicationRunner
+public class ConsumerTaskExecutor
 {
   private final ThreadPoolTaskExecutor taskExecutor;
-  private final ChatRoomChannel chatRoomChannel;
-  private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  private final Runnable consumerTask;
+  private final Consumer<String, AbstractMessageTo> consumer;
   private final WorkAssignor workAssignor;
 
-  CompletableFuture<Void> chatRoomChannelConsumerJob;
+  CompletableFuture<Void> consumerTaskJob;
 
 
-  @Override
-  public void run(ApplicationArguments args) throws Exception
+  public void executeConsumerTask()
   {
-    workAssignor.assignWork(chatRoomChannelConsumer);
-    log.info("Starting the consumer for the ChatRoomChannel");
-    chatRoomChannelConsumerJob = taskExecutor
-        .submitCompletable(chatRoomChannel)
+    workAssignor.assignWork(consumer);
+    log.info("Starting the consumer-task for {}", consumerTask);
+    consumerTaskJob = taskExecutor
+        .submitCompletable(consumerTask)
         .exceptionally(e ->
         {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
           return null;
         });
   }
 
   @PreDestroy
-  public void joinChatRoomChannelConsumerJob()
+  public void joinConsumerTaskJob()
   {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    chatRoomChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
+    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    consumer.wakeup();
+    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+    consumerTaskJob.join();
+    log.info("Joined the consumer-task for {}", consumerTask);
   }
 
 
index b635dfc..da7ee75 100644 (file)
@@ -1,62 +1,28 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import org.springframework.stereotype.Component;
 
 
 @ConditionalOnProperty(
     prefix = "chat.backend",
     name = "services",
     havingValue = "kafka")
+@Component
 @RequiredArgsConstructor
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
-  private final ThreadPoolTaskExecutor taskExecutor;
-  private final ChatRoomChannel chatRoomChannel;
-  private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
-  private final WorkAssignor workAssignor;
-
-  CompletableFuture<Void> chatRoomChannelConsumerJob;
+  private final ConsumerTaskExecutor chatRoomChannelTaskExecutor;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    workAssignor.assignWork(chatRoomChannelConsumer);
-    log.info("Starting the consumer for the ChatRoomChannel");
-    chatRoomChannelConsumerJob = taskExecutor
-        .submitCompletable(chatRoomChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
-          return null;
-        });
-  }
-
-  @PreDestroy
-  public void joinChatRoomChannelConsumerJob()
-  {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    chatRoomChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-
-
-  interface WorkAssignor
-  {
-    void assignWork(Consumer<?, ?> consumer);
+    chatRoomChannelTaskExecutor.executeConsumerTask();
   }
 }
index d17c79c..cda0b94 100644 (file)
@@ -36,13 +36,13 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
+  ConsumerTaskExecutor chatRoomChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       ChatRoomChannel chatRoomChannel,
       Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
-      KafkaServicesApplicationRunner.WorkAssignor workAssignor)
+      ConsumerTaskExecutor.WorkAssignor workAssignor)
   {
-    return new KafkaServicesApplicationRunner(
+    return new ConsumerTaskExecutor(
         taskExecutor,
         chatRoomChannel,
         chatRoomChannelConsumer,
@@ -50,7 +50,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  KafkaServicesApplicationRunner.WorkAssignor workAssignor(
+  ConsumerTaskExecutor.WorkAssignor workAssignor(
       ChatBackendProperties properties,
       ChatRoomChannel chatRoomChannel)
   {
index 204b34e..4aa362d 100644 (file)
@@ -8,7 +8,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.DefaultApplicationArguments;
 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
 import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -55,7 +54,7 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   static class KafkaChatHomeTestConfiguration
   {
     @Bean
-    KafkaServicesApplicationRunner.WorkAssignor workAssignor(
+    ConsumerTaskExecutor.WorkAssignor workAssignor(
         ChatRoomChannel chatRoomChannel)
     {
       return consumer ->
@@ -76,13 +75,17 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
 
   @BeforeAll
-  public static void sendAndLoadStoredData(@Autowired KafkaTemplate<String, String> messageTemplate)
+  public static void sendAndLoadStoredData(
+      @Autowired ConsumerTaskExecutor consumerTaskExecutor,
+      @Autowired KafkaTemplate<String, String> messageTemplate)
   {
     send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+    consumerTaskExecutor.executeConsumerTask();
   }
 
   static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
@@ -98,8 +101,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   }
 
   @AfterAll
-  static void joinConsumerJob(@Autowired KafkaServicesApplicationRunner applicationRunner)
+  static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor)
   {
-    applicationRunner.joinChatRoomChannelConsumerJob();
+    consumerTaskExecutor.joinConsumerTaskJob();
   }
 }