Springify: Start/Stop prüft, ob der Container schon/noch läuft
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index ea899cc..a5a5ce6 100644 (file)
@@ -59,12 +59,18 @@ public class EndlessConsumer<K, V>
 
   public synchronized void start()
   {
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
     log.info("{} - Starting - consumed {} messages before", id, consumed);
     registry.getListenerContainer(id).start();
   }
 
   public synchronized void stop()
   {
+    if (!registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
     log.info("{} - Stopping", id);
     registry.getListenerContainer(id).stop();
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
@@ -74,6 +80,21 @@ public class EndlessConsumer<K, V>
   public void destroy()
   {
     log.info("{} - Destroy!", id);
-    stop();
+    try
+    {
+      stop();
+    }
+    catch (IllegalStateException e)
+    {
+      log.info("{} - Was already stopped", id);
+    }
+    catch (Exception e)
+    {
+      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
+    }
+    finally
+    {
+      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+    }
   }
 }