From 435497f0e0836d8a81052ef0fc73d06cc2f67762 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 18 Nov 2022 17:18:01 +0100 Subject: [PATCH] Die App beendet sich, wenn der SimpleConsumer stolpert --- src/main/java/de/juplo/kafka/Application.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 3828b1d..04dc343 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,11 +7,12 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; @SpringBootApplication @@ -24,14 +25,27 @@ public class Application implements ApplicationRunner Consumer kafkaConsumer; @Autowired SimpleConsumer simpleConsumer; + @Autowired + ConfigurableApplicationContext context; - Future consumerJob; + ListenableFuture consumerJob; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - consumerJob = taskExecutor.submit(simpleConsumer); + consumerJob = taskExecutor.submitListenable(simpleConsumer); + consumerJob.addCallback( + exitStatus -> + { + log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); + SpringApplication.exit(context, () -> exitStatus); + }, + t -> + { + log.error("SimpleConsumer exited abnormally!", t); + SpringApplication.exit(context, () -> 2); + }); } @PreDestroy -- 2.20.1