Fehlerbehandlung in EndlessConsumer.destroy() korrigiert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index b3dd446..f25e93c 100644 (file)
@@ -110,7 +110,7 @@ public class EndlessConsumer implements Runnable
   {
     boolean stateChanged = running.compareAndSet(false, true);
     if (!stateChanged)
-      throw new RuntimeException("Consumer instance " + id + " is already running!");
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
     log.info("{} - Starting - consumed {} messages before", id, consumed);
     future = executor.submit(this);
@@ -120,7 +120,7 @@ public class EndlessConsumer implements Runnable
   {
     boolean stateChanged = running.compareAndSet(true, false);
     if (!stateChanged)
-      throw new RuntimeException("Consumer instance " + id + " is not running!");
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
 
     log.info("{} - Stopping", id);
     consumer.wakeup();
@@ -140,6 +140,10 @@ public class EndlessConsumer implements Runnable
     {
       log.info("{} - Was already stopped", id);
     }
+    catch (Exception e)
+    {
+      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
+    }
     finally
     {
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);