From ce840f48340d55613291fca468bf10b834c473db Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 24 Jul 2022 17:40:36 +0200
Subject: [PATCH] =?utf8?q?Fehler=20im=20Shutdown-Code=20korrigiert:=20Shut?=
 =?utf8?q?down=20von=20`EndlessConsumer`=20zu=20sp=C3=A4t?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

---
 src/main/java/de/juplo/kafka/Application.java  | 18 ++++++++++++++++--
 .../java/de/juplo/kafka/EndlessConsumer.java   | 17 +----------------
 2 files changed, 17 insertions(+), 18 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java
index d280aa6..76c2520 100644
--- a/src/main/java/de/juplo/kafka/Application.java
+++ b/src/main/java/de/juplo/kafka/Application.java
@@ -30,8 +30,22 @@ public class Application implements ApplicationRunner
   }
 
   @PreDestroy
-  public void stopExecutor()
+  public void shutdown()
   {
+    try
+    {
+      log.info("Stopping EndlessConsumer");
+      endlessConsumer.stop();
+    }
+    catch (IllegalStateException e)
+    {
+      log.info("Was already stopped: {}", e.toString());
+    }
+    catch (Exception e)
+    {
+      log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
+    }
+
     try
     {
       log.info("Shutting down the ExecutorService.");
@@ -41,7 +55,7 @@ public class Application implements ApplicationRunner
     }
     catch (InterruptedException e)
     {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
+      log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
     }
     finally
     {
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
index 7e243a9..3d154c2 100644
--- a/src/main/java/de/juplo/kafka/EndlessConsumer.java
+++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java
@@ -243,22 +243,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   public void destroy() throws ExecutionException, InterruptedException
   {
     log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
+    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
   }
 
   public boolean running()
-- 
2.20.1