import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
havingValue = "kafka")
@Component
@RequiredArgsConstructor
+@Slf4j
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
private final ChannelTaskRunner channelTaskRunner;
+ private final Consumer dataChannelConsumer;
+ private final Consumer infoChannelConsumer;
@Override
public void run(ApplicationArguments args)
{
+ log.info("Executing channel-tasks");
channelTaskRunner.executeChannels();
}
@PreDestroy
public void joinChannels() throws InterruptedException
{
+ log.info("Closing consumers");
+ dataChannelConsumer.close();
+ infoChannelConsumer.close();
+ log.info("Joining channel-tasks");
channelTaskRunner.joinChannels();
}
}
@Configuration
public class KafkaServicesConfiguration
{
+ @Bean
+ KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
+ {
+ return new KafkaServicesThreadPoolTaskExecutorCustomizer();
+ }
+
@Bean
ChannelTaskRunner channelTaskRunner(
ChannelTaskExecutor infoChannelTaskExecutor,
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.task.ThreadPoolTaskExecutorCustomizer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+
+/**
+ * Customizer for the auto-configured Bean {@code applicationTaskExecutor}.
+ *
+ * This customization is necessary, because otherwise, the bean is part
+ * of the lowest phase of the {@link org.springframework.context.SmartLifecycle}
+ * and, hence, destroyed first during shutdown.
+ *
+ * Without this customization, the shutdown of the thread-pool, that is triggered
+ * this way does not succeed, blocking the furthershutdown for 30 seconds.
+ */
+@Slf4j
+public class KafkaServicesThreadPoolTaskExecutorCustomizer implements ThreadPoolTaskExecutorCustomizer
+{
+ @Override
+ public void customize(ThreadPoolTaskExecutor taskExecutor)
+ {
+ log.info("Customizing the auto-configured ThreadPoolTaskExecutor");
+ taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
+ taskExecutor.setAwaitTerminationSeconds(10);
+ }
+}