]> juplo.de Git - demos/kafka/training/commitdiff
Sauberen Shutdown verbessert: Shutdown wird über `SmartLifecycle` initiiert consumer/spring-consumer--2026-03-20--19-06 consumer/spring-consumer--2026-03-21--smartlifecycle-only
authorKai Moritz <kai@juplo.de>
Fri, 20 Mar 2026 16:44:53 +0000 (17:44 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 20 Mar 2026 17:09:41 +0000 (18:09 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index ea6b64ea805b15416d0d5a378b68b6edd0f84c4a..a45c1505d4697303b999cba64acac0a32d0f16e9 100644 (file)
@@ -25,8 +25,7 @@ public class ApplicationConfiguration
       new ExampleConsumer(
         properties.getClientId(),
         properties.getConsumerProperties().getTopic(),
-        kafkaConsumer,
-        () -> applicationContext.close());
+        kafkaConsumer);
   }
 
   @Bean(destroyMethod = "")
index 1f5a5706c875604947c467cd32611c4135c6a01e..0c06b10f29d6ebe9d802df5b02e476cd9614efd4 100644 (file)
@@ -5,37 +5,45 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.context.SmartLifecycle;
 
 import java.time.Duration;
 import java.util.Arrays;
 
 
 @Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements Runnable, SmartLifecycle
 {
   private final String id;
   private final String topic;
   private final Consumer<String, String> consumer;
-  private final Thread workerThread;
-  private final Runnable closeCallback;
 
+  private Thread workerThread;
+  private volatile boolean running = false;
   private long consumed = 0;
 
 
   public ExampleConsumer(
     String clientId,
     String topic,
-    Consumer<String, String> consumer,
-    Runnable closeCallback)
+    Consumer<String, String> consumer)
   {
     this.id = clientId;
     this.topic = topic;
     this.consumer = consumer;
+  }
 
+  @Override
+  public synchronized void start()
+  {
+    if (running)
+    {
+      log.info("{} - Already running!", id);
+      return;
+    }
+    running = true;
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
-
-    this.closeCallback = closeCallback;
   }
 
 
@@ -71,8 +79,6 @@ public class ExampleConsumer implements Runnable
     {
       log.error("{} - Unexpected error, unsubscribing!", id, e);
       consumer.unsubscribe();
-      log.info("{} - Triggering exit of application!", id);
-      new Thread(closeCallback).start();
     }
     finally
     {
@@ -94,11 +100,35 @@ public class ExampleConsumer implements Runnable
   }
 
 
-  public void shutdown() throws InterruptedException
+  @Override
+  public boolean isRunning()
+  {
+    return running;
+  }
+
+  @Override
+  public synchronized void stop()
   {
+    if (!running)
+    {
+      log.info("{} - Not running!", id);
+      return;
+    }
+
+    running = false;
+
     log.info("{} - Waking up the consumer", id);
     consumer.wakeup();
+
     log.info("{} - Joining the worker thread", id);
-    workerThread.join();
+    try
+    {
+      workerThread.join();
+    }
+    catch (InterruptedException e)
+    {
+      Thread.currentThread().interrupt();
+      log.error("{} - Interrupted while waiting for worker thread", id, e);
+    }
   }
 }