From a523851ae5c9717d06fa7d7e2ade73ffb6cfb8cc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 20 Mar 2026 17:44:53 +0100 Subject: [PATCH] =?utf8?q?Sauberen=20Shutdown=20verbessert:=20Shutdown=20w?= =?utf8?q?ird=20=C3=BCber=20`SmartLifecycle`=20initiiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 3 +- .../java/de/juplo/kafka/ExampleConsumer.java | 52 +++++++++++++++---- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index ea6b64ea..a45c1505 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -25,8 +25,7 @@ public class ApplicationConfiguration new ExampleConsumer( properties.getClientId(), properties.getConsumerProperties().getTopic(), - kafkaConsumer, - () -> applicationContext.close()); + kafkaConsumer); } @Bean(destroyMethod = "") diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 1f5a5706..0c06b10f 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,37 +5,45 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.SmartLifecycle; import java.time.Duration; import java.util.Arrays; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements Runnable, SmartLifecycle { private final String id; private final String topic; private final Consumer consumer; - private final Thread workerThread; - private final Runnable closeCallback; + private Thread workerThread; + private volatile boolean running = false; private long consumed = 0; public ExampleConsumer( String clientId, String topic, - Consumer consumer, - Runnable closeCallback) + Consumer consumer) { this.id = clientId; this.topic = topic; this.consumer = consumer; + } + @Override + public synchronized void start() + { + if (running) + { + log.info("{} - Already running!", id); + return; + } + running = true; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); - - this.closeCallback = closeCallback; } @@ -71,8 +79,6 @@ public class ExampleConsumer implements Runnable { log.error("{} - Unexpected error, unsubscribing!", id, e); consumer.unsubscribe(); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); } finally { @@ -94,11 +100,35 @@ public class ExampleConsumer implements Runnable } - public void shutdown() throws InterruptedException + @Override + public boolean isRunning() + { + return running; + } + + @Override + public synchronized void stop() { + if (!running) + { + log.info("{} - Not running!", id); + return; + } + + running = false; + log.info("{} - Waking up the consumer", id); consumer.wakeup(); + log.info("{} - Joining the worker thread", id); - workerThread.join(); + try + { + workerThread.join(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + log.error("{} - Interrupted while waiting for worker thread", id, e); + } } } -- 2.39.5