X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fseek;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fseek%2FConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fseek%2FConsumer.java;h=836ca13580d7e0cd513c6acbffcba71193c20080;hp=737694547ad78ddf96d191c6666b9f6cc87aba98;hb=d6b305a86f7006eb52be48a9f233352523437e6d;hpb=206e7920ca828fd52c9d93671ef3d222239c455f diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java index 7376945..836ca13 100644 --- a/src/main/java/de/juplo/kafka/seek/Consumer.java +++ b/src/main/java/de/juplo/kafka/seek/Consumer.java @@ -57,13 +57,10 @@ public class Consumer implements Runnable @Override public void run() { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList(topic)); - try { - - running = true; + log.info("{} - Subscribing to topic test", id); + consumer.subscribe(Arrays.asList(topic)); while (running) { @@ -102,8 +99,8 @@ public class Consumer implements Runnable finally { log.info("{} - Unsubscribing...", id); - consumer.unsubscribe(); running = false; + consumer.unsubscribe(); offset = null; } } @@ -121,6 +118,7 @@ public class Consumer implements Runnable throw new RuntimeException("Consumier instance " + id + " is already running!"); log.info("Running {}", id); + running = true; future = executor.submit(this); } @@ -135,7 +133,6 @@ public class Consumer implements Runnable future.get(); } - @PreDestroy public void destroy() throws ExecutionException, InterruptedException {