Eine Exception im Producer löst das Beenden der App aus
authorKai Moritz <kai@juplo.de>
Fri, 1 Nov 2024 08:12:51 +0000 (09:12 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 8 Nov 2024 17:06:36 +0000 (18:06 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleProducer.java

index 2b69696..3d775f0 100644 (file)
@@ -4,6 +4,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -18,7 +19,8 @@ public class ApplicationConfiguration
   @Bean
   public ExampleProducer exampleProducer(
       ApplicationProperties properties,
-      Producer<String, String> kafkaProducer)
+      Producer<String, String> kafkaProducer,
+      ConfigurableApplicationContext applicationContext)
   {
     return
         new ExampleProducer(
@@ -27,7 +29,8 @@ public class ApplicationConfiguration
             properties.getProducerProperties().getThrottle() == null
               ? Duration.ofMillis(500)
               : properties.getProducerProperties().getThrottle(),
-            kafkaProducer);
+            kafkaProducer,
+            () -> applicationContext.close());
   }
 
   @Bean
index 0842faa..2190c8d 100644 (file)
@@ -15,6 +15,7 @@ public class ExampleProducer implements Runnable
   private final Duration throttle;
   private final Producer<String, String> producer;
   private final Thread workerThread;
+  private final Runnable closeCallback;
 
   private volatile boolean running = true;
   private long produced = 0;
@@ -24,7 +25,8 @@ public class ExampleProducer implements Runnable
     String id,
     String topic,
     Duration throttle,
-    Producer<String, String> producer)
+    Producer<String, String> producer,
+    Runnable closeCallback)
   {
     this.id = id;
     this.topic = topic;
@@ -33,6 +35,8 @@ public class ExampleProducer implements Runnable
 
     workerThread = new Thread(this, "ExampleProducer Worker-Thread");
     workerThread.start();
+
+    this.closeCallback = closeCallback;
   }
 
 
@@ -63,6 +67,8 @@ public class ExampleProducer implements Runnable
     catch (Exception e)
     {
       log.error("{} - Unexpected error!", id, e);
+      log.info("{} - Triggering exit of application!", id);
+      new Thread(closeCallback).start();
     }
     finally
     {