WIP
[demos/kafka/seek] / src / main / java / de / juplo / kafka / seek / Consumer.java
index 7376945..77e156b 100644 (file)
@@ -57,13 +57,10 @@ public class Consumer implements Runnable
   @Override
   public void run()
   {
-    log.info("{} - Subscribing to topic test", id);
-    consumer.subscribe(Arrays.asList(topic));
-
     try
     {
-
-      running = true;
+      log.info("{} - Subscribing to topic test", id);
+      consumer.subscribe(Arrays.asList(topic));
 
       while (running)
       {
@@ -102,8 +99,8 @@ public class Consumer implements Runnable
     finally
     {
       log.info("{} - Unsubscribing...", id);
-      consumer.unsubscribe();
       running = false;
+      consumer.unsubscribe();
       offset = null;
     }
   }
@@ -121,13 +118,14 @@ public class Consumer implements Runnable
       throw new RuntimeException("Consumier instance " + id + " is already running!");
 
     log.info("Running {}", id);
+    running = true;
     future = executor.submit(this);
   }
 
   public synchronized void stop() throws ExecutionException, InterruptedException
   {
     if (!running)
-      throw new RuntimeException("Consumier instance " + id + " is not running!");
+      throw new RuntimeException("Consumer instance " + id + " is not running!");
 
     log.info("Stopping {}", id);
     running = false;
@@ -135,7 +133,6 @@ public class Consumer implements Runnable
     future.get();
   }
 
-
   @PreDestroy
   public void destroy() throws ExecutionException, InterruptedException
   {