+ @Autowired
+ ThreadPoolTaskExecutor taskExecutor;
+ @Autowired
+ ConfigurableApplicationContext context;
+
+ @Autowired
+ ChatMessageChannel chatMessageChannel;
+
+ CompletableFuture<Optional<Exception>> chatRoomChannelConsumerJob;
+ CompletableFuture<Optional<Exception>> chatMessageChannelConsumerJob;
+
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ log.info("Starting the consumer for the ChatRoomChannel");
+ chatRoomChannelConsumerJob = taskExecutor.submitCompletable(chatMessageChannel);
+ chatRoomChannelConsumerJob.thenAccept(exceptionOptional ->
+ {
+ exceptionOptional.ifPresent();
+ log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+ SpringApplication.exit(context, () -> exitStatus);
+ },
+ t ->
+ {
+ log.error("SimpleConsumer exited abnormally!", t);
+ SpringApplication.exit(context, () -> 2);
+ });
+ }
+
+ @PreDestroy
+ public void shutdown() throws ExecutionException, InterruptedException
+ {
+ log.info("Signaling SimpleConsumer to quit its work");
+ kafkaConsumer.wakeup();
+ log.info("Waiting for SimpleConsumer to finish its work");
+ consumerJob.get();
+ log.info("SimpleConsumer finished its work");
+ }
+
+