Fehlerbehandlung in EndlessConsumer.destroy() korrigiert
authorKai Moritz <kai@juplo.de>
Sat, 9 Apr 2022 16:50:43 +0000 (18:50 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 17:02:23 +0000 (19:02 +0200)
src/main/java/de/juplo/kafka/EndlessConsumer.java

index b3dd446..f25e93c 100644 (file)
@@ -110,7 +110,7 @@ public class EndlessConsumer implements Runnable
   {
     boolean stateChanged = running.compareAndSet(false, true);
     if (!stateChanged)
-      throw new RuntimeException("Consumer instance " + id + " is already running!");
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
     log.info("{} - Starting - consumed {} messages before", id, consumed);
     future = executor.submit(this);
@@ -120,7 +120,7 @@ public class EndlessConsumer implements Runnable
   {
     boolean stateChanged = running.compareAndSet(true, false);
     if (!stateChanged)
-      throw new RuntimeException("Consumer instance " + id + " is not running!");
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
 
     log.info("{} - Stopping", id);
     consumer.wakeup();
@@ -140,6 +140,10 @@ public class EndlessConsumer implements Runnable
     {
       log.info("{} - Was already stopped", id);
     }
+    catch (Exception e)
+    {
+      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
+    }
     finally
     {
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);