import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
@SpringBootApplication
Consumer<?, ?> kafkaConsumer;
@Autowired
SimpleConsumer simpleConsumer;
+ @Autowired
+ ConfigurableApplicationContext context;
- Future<?> consumerJob;
+ ListenableFuture<Integer> consumerJob;
@Override
public void run(ApplicationArguments args) throws Exception
{
log.info("Starting SimpleConsumer");
- consumerJob = taskExecutor.submit(simpleConsumer);
+ consumerJob = taskExecutor.submitListenable(simpleConsumer);
+ consumerJob.addCallback(
+ exitStatus ->
+ {
+ log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+ SpringApplication.exit(context, () -> exitStatus);
+ },
+ t ->
+ {
+ log.error("SimpleConsumer exited abnormally!", t);
+ SpringApplication.exit(context, () -> 2);
+ });
}
@PreDestroy