From 2d9be8876fbcb7934ccc16c3b8cfa82f1962ee6a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Mar 2025 18:58:03 +0100 Subject: [PATCH] =?utf8?q?2.=20Schritt=20Live-Umbau:=20Fix=20f=C3=BCr=20Co?= =?utf8?q?ncurrent=20Modification?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationConfiguration.java | 2 +- src/main/java/de/juplo/kafka/ExampleConsumer.java | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 55c42f61..49d78910 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -26,7 +26,7 @@ public class ApplicationConfiguration kafkaConsumer); } - @Bean + @Bean(destroyMethod = "") public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 1d9453b3..66665bee 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -85,5 +85,14 @@ public class ExampleConsumer implements Runnable consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } + + + public void shutdown() throws InterruptedException + { + log.info("{} - Waking up the consumer", id); + consumer.wakeup(); + log.info("{} - Joining the worker thread", id); + workerThread.join(); + } } -- 2.20.1