From 0de7a5a8ecb8fbabf49b7145285a2245ca14eb54 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 1 Nov 2024 09:12:51 +0100 Subject: [PATCH] =?utf8?q?Eine=20Exception=20im=20Producer=20l=C3=B6st=20d?= =?utf8?q?as=20Beenden=20der=20App=20aus?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationConfiguration.java | 7 +++++-- src/main/java/de/juplo/kafka/ExampleProducer.java | 8 +++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 2b69696..3d775f0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,7 +19,8 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, - Producer kafkaProducer) + Producer kafkaProducer, + ConfigurableApplicationContext applicationContext) { return new ExampleProducer( @@ -27,7 +29,8 @@ public class ApplicationConfiguration properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), - kafkaProducer); + kafkaProducer, + () -> applicationContext.close()); } @Bean diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 0842faa..2190c8d 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -15,6 +15,7 @@ public class ExampleProducer implements Runnable private final Duration throttle; private final Producer producer; private final Thread workerThread; + private final Runnable closeCallback; private volatile boolean running = true; private long produced = 0; @@ -24,7 +25,8 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer) + Producer producer, + Runnable closeCallback) { this.id = id; this.topic = topic; @@ -33,6 +35,8 @@ public class ExampleProducer implements Runnable workerThread = new Thread(this, "ExampleProducer Worker-Thread"); workerThread.start(); + + this.closeCallback = closeCallback; } @@ -63,6 +67,8 @@ public class ExampleProducer implements Runnable catch (Exception e) { log.error("{} - Unexpected error!", id, e); + log.info("{} - Triggering exit of application!", id); + new Thread(closeCallback).start(); } finally { -- 2.20.1