X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;h=3157ef64119513d9be5e0f18b9ce37f1eaedf750;hb=ae796c9e9e4ec5c86853aa9b100d0cfe4deeda3c;hp=ab357c72692aee1216e031eb2864edfe9a746771;hpb=d58ed9de440e56ecf906b26803591290b1c4f60d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index ab357c7..3157ef6 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,49 +7,34 @@ import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.scheduling.annotation.EnableAsync; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; @SpringBootApplication +@EnableAsync @Slf4j public class Application implements ApplicationRunner { @Autowired - ExecutorService executorService; - @Autowired - Consumer consumer; + Consumer consumer; @Autowired SimpleConsumer simpleConsumer; - Future consumerJob; - @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - consumerJob = executorService.submit(simpleConsumer); + simpleConsumer.start(); } @PreDestroy - public void shutdown() throws ExecutionException, InterruptedException + public void stop() throws ExecutionException, InterruptedException { log.info("Signaling SimpleConsumer to quit its work"); consumer.wakeup(); - log.info("Waiting for SimpleConsumer to finish its work"); - consumerJob.get(); - log.info("SimpleConsumer finished its work"); - } - - @Bean(destroyMethod = "close") - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); }