X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;h=718676b95e10c3590aeb7f185e0ee30da7bbcf21;hb=1e858c1835d47259583cf3cce18921c550682e2b;hp=3157ef64119513d9be5e0f18b9ce37f1eaedf750;hpb=ae796c9e9e4ec5c86853aa9b100d0cfe4deeda3c;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 3157ef6..718676b 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -19,7 +19,7 @@ import java.util.concurrent.ExecutionException; public class Application implements ApplicationRunner { @Autowired - Consumer consumer; + Consumer kafkaConsumer; @Autowired SimpleConsumer simpleConsumer; @@ -27,14 +27,25 @@ public class Application implements ApplicationRunner public void run(ApplicationArguments args) throws Exception { log.info("Starting SimpleConsumer"); - simpleConsumer.start(); + simpleConsumer.run(); } @PreDestroy public void stop() throws ExecutionException, InterruptedException { log.info("Signaling SimpleConsumer to quit its work"); - consumer.wakeup(); + kafkaConsumer.wakeup(); + + while (simpleConsumer.isRunning()) + { + log.info("Waiting for SimpleConsumer to finish its work"); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } + log.info("SimpleConsumer finished its work"); }