04036d88cdff58ceef3058258512e4bb3f9ff31e
[demos/kafka/training] / src / main / java / de / juplo / kafka / Application.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.boot.ApplicationArguments;
7 import org.springframework.boot.ApplicationRunner;
8 import org.springframework.boot.SpringApplication;
9 import org.springframework.boot.autoconfigure.SpringBootApplication;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.kafka.core.ConsumerFactory;
12
13 import javax.annotation.PreDestroy;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Future;
17
18
19 @SpringBootApplication
20 @Slf4j
21 public class Application implements ApplicationRunner
22 {
23   @Autowired
24   ExecutorService executorService;
25   @Autowired
26   Consumer<?, ?> consumer;
27   @Autowired
28   SimpleConsumer simpleConsumer;
29
30   Future<?> consumerJob;
31
32   @Override
33   public void run(ApplicationArguments args) throws Exception
34   {
35     log.info("Starting SimpleConsumer");
36     consumerJob = executorService.submit(simpleConsumer);
37   }
38
39   @PreDestroy
40   public void shutdown() throws ExecutionException, InterruptedException
41   {
42     log.info("Signaling SimpleConsumer to quit its work");
43     consumer.wakeup();
44     log.info("Waiting for SimpleConsumer to finish its work");
45     consumerJob.get();
46     log.info("SimpleConsumer finished its work");
47   }
48
49
50   public static void main(String[] args)
51   {
52     SpringApplication.run(Application.class, args);
53   }
54 }