From: Kai Moritz Date: Sat, 15 Mar 2025 17:58:03 +0000 (+0100) Subject: 2. Schritt Live-Umbau: Fix für Concurrent Modification X-Git-Tag: spring/spring-consumer--livecoding--2025-03-18--19-42~2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=2d9be8876fbcb7934ccc16c3b8cfa82f1962ee6a;p=demos%2Fkafka%2Ftraining 2. Schritt Live-Umbau: Fix für Concurrent Modification --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 55c42f61..49d78910 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -26,7 +26,7 @@ public class ApplicationConfiguration kafkaConsumer); } - @Bean + @Bean(destroyMethod = "") public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 1d9453b3..66665bee 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -85,5 +85,14 @@ public class ExampleConsumer implements Runnable consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } + + + public void shutdown() throws InterruptedException + { + log.info("{} - Waking up the consumer", id); + consumer.wakeup(); + log.info("{} - Joining the worker thread", id); + workerThread.join(); + } }