X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;h=3828b1d197a7b1d0cd0c078c61fa8d43d8e0a765;hb=46849dd63a31b524ba1f4565e35c3ed8787464b0;hp=b4a960d47e5d8628c52d721d31fd30a53974394b;hpb=377840107151d9c270f7e3a91a118dce4aa1295f;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index b4a960d..3828b1d 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,12 +7,11 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; @SpringBootApplication @@ -20,29 +19,29 @@ import java.util.concurrent.TimeUnit; public class Application implements ApplicationRunner { @Autowired - Consumer consumer; + ThreadPoolTaskExecutor taskExecutor; + @Autowired + Consumer kafkaConsumer; @Autowired SimpleConsumer simpleConsumer; + Future consumerJob; @Override public void run(ApplicationArguments args) throws Exception { - log.info("Starting EndlessConsumer"); - simpleConsumer.start(); + log.info("Starting SimpleConsumer"); + consumerJob = taskExecutor.submit(simpleConsumer); } @PreDestroy - public void shutdown() - { - log.info("Signaling the consumer to quit its work"); - consumer.wakeup(); - } - - @Bean(destroyMethod = "close") - public Consumer kafkaConsumer(ConsumerFactory factory) + public void shutdown() throws ExecutionException, InterruptedException { - return factory.createConsumer(); + log.info("Signaling SimpleConsumer to quit its work"); + kafkaConsumer.wakeup(); + log.info("Waiting for SimpleConsumer to finish its work"); + consumerJob.get(); + log.info("SimpleConsumer finished its work"); }