- @Autowired
- ThreadPoolTaskExecutor taskExecutor;
- @Autowired
- Consumer<?, ?> kafkaConsumer;
- @Autowired
- SimpleConsumer simpleConsumer;
-
- Future<?> consumerJob;
-
- @Override
- public void run(ApplicationArguments args) throws Exception
- {
- log.info("Starting SimpleConsumer");
- consumerJob = taskExecutor.submit(simpleConsumer);
- }
-
- @PreDestroy
- public void shutdown() throws ExecutionException, InterruptedException
- {
- log.info("Signaling SimpleConsumer to quit its work");
- kafkaConsumer.wakeup();
- log.info("Waiting for SimpleConsumer to finish its work");
- consumerJob.get();
- log.info("SimpleConsumer finished its work");
- }
-
-