WIP
[demos/kafka/seek] / src / main / java / de / juplo / kafka / seek / Consumer.java
index 7376945..836ca13 100644 (file)
@@ -57,13 +57,10 @@ public class Consumer implements Runnable
   @Override
   public void run()
   {
-    log.info("{} - Subscribing to topic test", id);
-    consumer.subscribe(Arrays.asList(topic));
-
     try
     {
-
-      running = true;
+      log.info("{} - Subscribing to topic test", id);
+      consumer.subscribe(Arrays.asList(topic));
 
       while (running)
       {
@@ -102,8 +99,8 @@ public class Consumer implements Runnable
     finally
     {
       log.info("{} - Unsubscribing...", id);
-      consumer.unsubscribe();
       running = false;
+      consumer.unsubscribe();
       offset = null;
     }
   }
@@ -121,6 +118,7 @@ public class Consumer implements Runnable
       throw new RuntimeException("Consumier instance " + id + " is already running!");
 
     log.info("Running {}", id);
+    running = true;
     future = executor.submit(this);
   }
 
@@ -135,7 +133,6 @@ public class Consumer implements Runnable
     future.get();
   }
 
-
   @PreDestroy
   public void destroy() throws ExecutionException, InterruptedException
   {