From: Kai Moritz Date: Fri, 20 Mar 2026 16:44:53 +0000 (+0100) Subject: Sauberen Shutdown verbessert: Shutdown wird über `SmartLifecycle` initiiert X-Git-Tag: consumer/spring-consumer--2026-03-20--19-06 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=a523851ae5c9717d06fa7d7e2ade73ffb6cfb8cc;p=demos%2Fkafka%2Ftraining Sauberen Shutdown verbessert: Shutdown wird über `SmartLifecycle` initiiert --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index ea6b64ea..a45c1505 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -25,8 +25,7 @@ public class ApplicationConfiguration new ExampleConsumer( properties.getClientId(), properties.getConsumerProperties().getTopic(), - kafkaConsumer, - () -> applicationContext.close()); + kafkaConsumer); } @Bean(destroyMethod = "") diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 1f5a5706..0c06b10f 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -5,37 +5,45 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.SmartLifecycle; import java.time.Duration; import java.util.Arrays; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements Runnable, SmartLifecycle { private final String id; private final String topic; private final Consumer consumer; - private final Thread workerThread; - private final Runnable closeCallback; + private Thread workerThread; + private volatile boolean running = false; private long consumed = 0; public ExampleConsumer( String clientId, String topic, - Consumer consumer, - Runnable closeCallback) + Consumer consumer) { this.id = clientId; this.topic = topic; this.consumer = consumer; + } + @Override + public synchronized void start() + { + if (running) + { + log.info("{} - Already running!", id); + return; + } + running = true; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); - - this.closeCallback = closeCallback; } @@ -71,8 +79,6 @@ public class ExampleConsumer implements Runnable { log.error("{} - Unexpected error, unsubscribing!", id, e); consumer.unsubscribe(); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); } finally { @@ -94,11 +100,35 @@ public class ExampleConsumer implements Runnable } - public void shutdown() throws InterruptedException + @Override + public boolean isRunning() + { + return running; + } + + @Override + public synchronized void stop() { + if (!running) + { + log.info("{} - Not running!", id); + return; + } + + running = false; + log.info("{} - Waking up the consumer", id); consumer.wakeup(); + log.info("{} - Joining the worker thread", id); - workerThread.join(); + try + { + workerThread.join(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + log.error("{} - Interrupted while waiting for worker thread", id, e); + } } }