Fehler im Shutdown-Code korrigiert: Shutdown von `EndlessConsumer` zu spät
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 15:40:36 +0000 (17:40 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 18:30:37 +0000 (20:30 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index d280aa6..76c2520 100644 (file)
@@ -30,8 +30,22 @@ public class Application implements ApplicationRunner
   }
 
   @PreDestroy
-  public void stopExecutor()
+  public void shutdown()
   {
+    try
+    {
+      log.info("Stopping EndlessConsumer");
+      endlessConsumer.stop();
+    }
+    catch (IllegalStateException e)
+    {
+      log.info("Was already stopped: {}", e.toString());
+    }
+    catch (Exception e)
+    {
+      log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
+    }
+
     try
     {
       log.info("Shutting down the ExecutorService.");
@@ -41,7 +55,7 @@ public class Application implements ApplicationRunner
     }
     catch (InterruptedException e)
     {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
+      log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
     }
     finally
     {
index 86bdbb5..66fea35 100644 (file)
@@ -245,22 +245,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   public void destroy() throws ExecutionException, InterruptedException
   {
     log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
+    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
   }
 
   public boolean running()