From: Kai Moritz Date: Sat, 15 Mar 2025 17:58:30 +0000 (+0100) Subject: 3. Schritt Live-Umbau: Exception beendet App X-Git-Tag: spring/spring-consumer--livecoding--schritte--2025-05-lvm~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d092bd95971e8a6c35f74fb3dea4c1db818a95aa;p=demos%2Fkafka%2Ftraining 3. Schritt Live-Umbau: Exception beendet App --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 49d7891..3e981b3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -17,13 +18,15 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - ApplicationProperties properties) + ApplicationProperties properties, + ConfigurableApplicationContext applicationContext) { return new ExampleConsumer( properties.getClientId(), properties.getTopic(), - kafkaConsumer); + kafkaConsumer, + () -> applicationContext.close()); } @Bean(destroyMethod = "") diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 66665be..e82719e 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -17,13 +17,15 @@ public class ExampleConsumer implements Runnable private final String topic; private final Consumer consumer; private final Thread workerThread; + private final Runnable closeCallback; private long consumed = 0; public ExampleConsumer( String clientId, String topic, - Consumer consumer) + Consumer consumer, + Runnable closeCallback) { this.id = clientId; this.topic = topic; @@ -31,6 +33,8 @@ public class ExampleConsumer implements Runnable workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); + + this.closeCallback = closeCallback; } @@ -66,6 +70,8 @@ public class ExampleConsumer implements Runnable { log.error("{} - Unexpected error, unsubscribing!", id, e); consumer.unsubscribe(); + log.info("{} - Triggering exit of application!", id); + new Thread(closeCallback).start(); } finally {