Merge der Refaktorisierung des EndlessConsumer (Branch 'deserialization')
[demos/kafka/training] / src / main / java / de / juplo / kafka / Application.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.boot.ApplicationArguments;
6 import org.springframework.boot.ApplicationRunner;
7 import org.springframework.boot.SpringApplication;
8 import org.springframework.boot.autoconfigure.SpringBootApplication;
9
10 import javax.annotation.PreDestroy;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.TimeUnit;
13
14
15 @SpringBootApplication
16 @Slf4j
17 public class Application implements ApplicationRunner
18 {
19   @Autowired
20   EndlessConsumer endlessConsumer;
21   @Autowired
22   ExecutorService executor;
23
24
25   @Override
26   public void run(ApplicationArguments args) throws Exception
27   {
28     log.info("Starting EndlessConsumer");
29     endlessConsumer.start();
30   }
31
32   @PreDestroy
33   public void stopExecutor()
34   {
35     try
36     {
37       log.info("Shutting down the ExecutorService.");
38       executor.shutdown();
39       log.info("Waiting 5 seconds for the ExecutorService to terminate...");
40       executor.awaitTermination(5, TimeUnit.SECONDS);
41     }
42     catch (InterruptedException e)
43     {
44       log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
45     }
46     finally
47     {
48       if (!executor.isTerminated())
49       {
50         log.warn("Forcing shutdown of ExecutorService!");
51         executor
52             .shutdownNow()
53             .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName()));
54       }
55       log.info("Shutdow of ExecutorService finished");
56     }
57   }
58
59
60   public static void main(String[] args)
61   {
62     SpringApplication.run(Application.class, args);
63   }
64 }