Die App beendet sich, wenn der SimpleConsumer stolpert
authorKai Moritz <kai@juplo.de>
Fri, 18 Nov 2022 16:18:01 +0000 (17:18 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Oct 2023 17:08:29 +0000 (19:08 +0200)
src/main/java/de/juplo/kafka/Application.java

index 3828b1d..04dc343 100644 (file)
@@ -7,11 +7,12 @@ import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.concurrent.ListenableFuture;
 
 import javax.annotation.PreDestroy;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 
 @SpringBootApplication
@@ -24,14 +25,27 @@ public class Application implements ApplicationRunner
   Consumer<?, ?> kafkaConsumer;
   @Autowired
   SimpleConsumer simpleConsumer;
+  @Autowired
+  ConfigurableApplicationContext context;
 
-  Future<?> consumerJob;
+  ListenableFuture<Integer> consumerJob;
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
     log.info("Starting SimpleConsumer");
-    consumerJob = taskExecutor.submit(simpleConsumer);
+    consumerJob = taskExecutor.submitListenable(simpleConsumer);
+    consumerJob.addCallback(
+      exitStatus ->
+      {
+        log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+        SpringApplication.exit(context, () -> exitStatus);
+        },
+      t ->
+      {
+        log.error("SimpleConsumer exited abnormally!", t);
+        SpringApplication.exit(context, () -> 2);
+      });
   }
 
   @PreDestroy