From f472b450132d6a6cfe2aa4cc12ef903a87ed60ab Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Fri, 1 Nov 2024 09:12:51 +0100
Subject: [PATCH] =?utf8?q?Eine=20Exception=20im=20Producer=20l=C3=B6st=20d?=
 =?utf8?q?as=20Beenden=20der=20App=20aus?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

---
 .../java/de/juplo/kafka/ApplicationConfiguration.java     | 7 +++++--
 src/main/java/de/juplo/kafka/ExampleProducer.java         | 8 +++++++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index 69411512..b601458a 100644
--- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@ -4,6 +4,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -18,7 +19,8 @@ public class ApplicationConfiguration
   @Bean
   public ExampleProducer exampleProducer(
     ApplicationProperties properties,
-    Producer<String, String> kafkaProducer)
+    Producer<String, String> kafkaProducer,
+    ConfigurableApplicationContext applicationContext)
   {
     return
       new ExampleProducer(
@@ -27,7 +29,8 @@ public class ApplicationConfiguration
         properties.getProducerProperties().getThrottle() == null
           ? Duration.ofMillis(500)
           : properties.getProducerProperties().getThrottle(),
-        kafkaProducer);
+        kafkaProducer,
+        () -> applicationContext.close());
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java
index 5a8a6625..8b935bf5 100644
--- a/src/main/java/de/juplo/kafka/ExampleProducer.java
+++ b/src/main/java/de/juplo/kafka/ExampleProducer.java
@@ -15,6 +15,7 @@ public class ExampleProducer implements Runnable
   private final Duration throttle;
   private final Producer<String, String> producer;
   private final Thread workerThread;
+  private final Runnable closeCallback;
 
   private volatile boolean running = true;
   private long produced = 0;
@@ -24,7 +25,8 @@ public class ExampleProducer implements Runnable
     String id,
     String topic,
     Duration throttle,
-    Producer<String, String> producer)
+    Producer<String, String> producer,
+    Runnable closeCallback)
   {
     this.id = id;
     this.topic = topic;
@@ -33,6 +35,8 @@ public class ExampleProducer implements Runnable
 
     workerThread = new Thread(this, "ExampleProducer Worker-Thread");
     workerThread.start();
+
+    this.closeCallback = closeCallback;
   }
 
 
@@ -63,6 +67,8 @@ public class ExampleProducer implements Runnable
     catch (Exception e)
     {
       log.error("{} - Unexpected error!", id, e);
+      log.info("{} - Triggering exit of application!", id);
+      new Thread(closeCallback).start();
     }
     finally
     {
-- 
2.20.1