From dabde73dd91dd7331de6cadf028c5362a2e1c905 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 23:19:40 +0100 Subject: [PATCH] =?utf8?q?Ungefangene=20Exceptions=20im=20`ExampleConsumer?= =?utf8?q?`=20l=C3=B6sen=20das=20Beenden=20der=20App=20aus?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationConfiguration.java | 7 +++++-- src/main/java/de/juplo/kafka/ExampleConsumer.java | 8 +++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0046f8f..a4856a6 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,13 +19,15 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - ApplicationProperties properties) + ApplicationProperties properties, + ConfigurableApplicationContext applicationContext) { return new ExampleConsumer( properties.getClientId(), properties.getConsumerProperties().getTopic(), - kafkaConsumer); + kafkaConsumer, + () -> applicationContext.close()); } @Bean(destroyMethod = "") diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index a39c884..59da60b 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -17,6 +17,7 @@ public class ExampleConsumer implements Runnable private final String topic; private final Consumer consumer; private final Thread workerThread; + private final Runnable closeCallback; private volatile boolean running = false; private long consumed = 0; @@ -25,7 +26,8 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer consumer) + Consumer consumer, + Runnable closeCallback) { this.id = clientId; this.topic = topic; @@ -33,6 +35,8 @@ public class ExampleConsumer implements Runnable workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); + + this.closeCallback = closeCallback; } @@ -70,6 +74,8 @@ public class ExampleConsumer implements Runnable { log.error("{} - Unexpected error, unsubscribing!", id, e.toString()); consumer.unsubscribe(); + log.info("{} - Triggering exit of application!", id); + new Thread(closeCallback).start(); } finally { -- 2.20.1