X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;h=04dc343c490d68fa634f071946573e5e17fd833b;hb=435497f0e0836d8a81052ef0fc73d06cc2f67762;hp=3828b1d197a7b1d0cd0c078c61fa8d43d8e0a765;hpb=39e2fe39ffbeba4fe7cae5e9d1786e51c12714d6;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 3828b1d..04dc343 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,11 +7,12 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; @SpringBootApplication @@ -24,14 +25,27 @@ public class Application implements ApplicationRunner Consumer kafkaConsumer; @Autowired SimpleConsumer simpleConsumer; + @Autowired + ConfigurableApplicationContext context; - Future consumerJob; + ListenableFuture consumerJob; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - consumerJob = taskExecutor.submit(simpleConsumer); + consumerJob = taskExecutor.submitListenable(simpleConsumer); + consumerJob.addCallback( + exitStatus -> + { + log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); + SpringApplication.exit(context, () -> exitStatus); + }, + t -> + { + log.error("SimpleConsumer exited abnormally!", t); + SpringApplication.exit(context, () -> 2); + }); } @PreDestroy