Ungefangene Exceptions im `ExampleConsumer` lösen das Beenden der App aus
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 22:19:40 +0000 (23:19 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 18:20:30 +0000 (19:20 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 0046f8f..a4856a6 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.StickyAssignor;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -18,13 +19,15 @@ public class ApplicationConfiguration
   @Bean
   public ExampleConsumer exampleConsumer(
       Consumer<String, String> kafkaConsumer,
-      ApplicationProperties properties)
+      ApplicationProperties properties,
+      ConfigurableApplicationContext applicationContext)
   {
     return
         new ExampleConsumer(
             properties.getClientId(),
             properties.getConsumerProperties().getTopic(),
-            kafkaConsumer);
+            kafkaConsumer,
+            () -> applicationContext.close());
   }
 
   @Bean(destroyMethod = "")
index 3206067..f832b45 100644 (file)
@@ -17,6 +17,7 @@ public class ExampleConsumer implements Runnable
   private final String topic;
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
+  private final Runnable closeCallback;
 
   private volatile boolean running = false;
   private long consumed = 0;
@@ -25,7 +26,8 @@ public class ExampleConsumer implements Runnable
   public ExampleConsumer(
     String clientId,
     String topic,
-    Consumer<String, String> consumer)
+    Consumer<String, String> consumer,
+    Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
@@ -33,6 +35,8 @@ public class ExampleConsumer implements Runnable
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
+
+    this.closeCallback = closeCallback;
   }
 
 
@@ -70,6 +74,8 @@ public class ExampleConsumer implements Runnable
     {
       log.error("{} - Unexpected error, unsubscribing!", id, e);
       consumer.unsubscribe();
+      log.info("{} - Triggering exit of application!", id);
+      new Thread(closeCallback).start();
     }
     finally
     {