Springify: Start/Stop prüft, ob der Container schon/noch läuft
authorKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 09:29:48 +0000 (11:29 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 11:56:25 +0000 (13:56 +0200)
src/main/java/de/juplo/kafka/EndlessConsumer.java

index ea899cc..a5a5ce6 100644 (file)
@@ -59,12 +59,18 @@ public class EndlessConsumer<K, V>
 
   public synchronized void start()
   {
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
     log.info("{} - Starting - consumed {} messages before", id, consumed);
     registry.getListenerContainer(id).start();
   }
 
   public synchronized void stop()
   {
+    if (!registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
     log.info("{} - Stopping", id);
     registry.getListenerContainer(id).stop();
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
@@ -74,6 +80,21 @@ public class EndlessConsumer<K, V>
   public void destroy()
   {
     log.info("{} - Destroy!", id);
-    stop();
+    try
+    {
+      stop();
+    }
+    catch (IllegalStateException e)
+    {
+      log.info("{} - Was already stopped", id);
+    }
+    catch (Exception e)
+    {
+      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
+    }
+    finally
+    {
+      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+    }
   }
 }