From: Kai Moritz Date: Fri, 18 Nov 2022 16:18:01 +0000 (+0100) Subject: Die App beendet sich, wenn der SimpleConsumer stolpert X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=946f603acf5312814000b45e350cae1d91a84b96;p=demos%2Fkafka%2Ftraining Die App beendet sich, wenn der SimpleConsumer stolpert --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 3828b1d..04dc343 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,11 +7,12 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; @SpringBootApplication @@ -24,14 +25,27 @@ public class Application implements ApplicationRunner Consumer kafkaConsumer; @Autowired SimpleConsumer simpleConsumer; + @Autowired + ConfigurableApplicationContext context; - Future consumerJob; + ListenableFuture consumerJob; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - consumerJob = taskExecutor.submit(simpleConsumer); + consumerJob = taskExecutor.submitListenable(simpleConsumer); + consumerJob.addCallback( + exitStatus -> + { + log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); + SpringApplication.exit(context, () -> exitStatus); + }, + t -> + { + log.error("SimpleConsumer exited abnormally!", t); + SpringApplication.exit(context, () -> 2); + }); } @PreDestroy