fix: The shutdown of the application was blocked
authorKai Moritz <kai@juplo.de>
Wed, 6 Mar 2024 07:26:03 +0000 (08:26 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 14 Mar 2024 08:11:21 +0000 (09:11 +0100)
* The auto-configured bean `applicationTaskExecutor` must not block, while
  it is shutting down, because otherwise, it infinitly waits for the
  completion, of the channel-tasks, which are stopped in a _later_ phase
  of the "smart" lifecycle.
* The bean is destroyed first, becaus it is is associated with the lowest
  lifecyle-phase (``Integer.MAX_VALUE``), which apparently cannot be
  overruled by `@DependsOn` (although suggested by the spring-
  documentation)
* Joining the channel-tasks was blocking infinitly, because the tasks were
  waiting for the kafka-consumers to be closed, what only happens _after_
  the joining completes.

src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesThreadPoolTaskExecutorCustomizer.java [new file with mode: 0644]

index badaeed..9d8539f 100644 (file)
@@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -14,20 +16,28 @@ import org.springframework.stereotype.Component;
     havingValue = "kafka")
 @Component
 @RequiredArgsConstructor
+@Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
   private final ChannelTaskRunner channelTaskRunner;
+  private final Consumer dataChannelConsumer;
+  private final Consumer infoChannelConsumer;
 
 
   @Override
   public void run(ApplicationArguments args)
   {
+    log.info("Executing channel-tasks");
     channelTaskRunner.executeChannels();
   }
 
   @PreDestroy
   public void joinChannels() throws InterruptedException
   {
+    log.info("Closing consumers");
+    dataChannelConsumer.close();
+    infoChannelConsumer.close();
+    log.info("Joining channel-tasks");
     channelTaskRunner.joinChannels();
   }
 }
index 525c427..54aa41f 100644 (file)
@@ -38,6 +38,12 @@ import java.util.Properties;
 @Configuration
 public class KafkaServicesConfiguration
 {
+  @Bean
+  KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
+  {
+    return new KafkaServicesThreadPoolTaskExecutorCustomizer();
+  }
+
   @Bean
   ChannelTaskRunner channelTaskRunner(
       ChannelTaskExecutor infoChannelTaskExecutor,
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesThreadPoolTaskExecutorCustomizer.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesThreadPoolTaskExecutorCustomizer.java
new file mode 100644 (file)
index 0000000..97ea54b
--- /dev/null
@@ -0,0 +1,28 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.task.ThreadPoolTaskExecutorCustomizer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+
+/**
+ * Customizer for the auto-configured Bean {@code applicationTaskExecutor}.
+ *
+ * This customization is necessary, because otherwise, the bean is part
+ * of the lowest phase of the {@link org.springframework.context.SmartLifecycle}
+ * and, hence, destroyed first during shutdown.
+ *
+ * Without this customization, the shutdown of the thread-pool, that is triggered
+ * this way does not succeed, blocking the furthershutdown for 30 seconds.
+ */
+@Slf4j
+public class KafkaServicesThreadPoolTaskExecutorCustomizer implements ThreadPoolTaskExecutorCustomizer
+{
+  @Override
+  public void customize(ThreadPoolTaskExecutor taskExecutor)
+  {
+    log.info("Customizing the auto-configured ThreadPoolTaskExecutor");
+    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
+    taskExecutor.setAwaitTerminationSeconds(10);
+  }
+}