3. Schritt Live-Umbau: Exception beendet App
authorKai Moritz <kai@juplo.de>
Sat, 15 Mar 2025 17:58:30 +0000 (18:58 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 15 Mar 2025 19:22:43 +0000 (20:22 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 49d7891..3e981b3 100644 (file)
@@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -17,13 +18,15 @@ public class ApplicationConfiguration
   @Bean
   public ExampleConsumer exampleConsumer(
     Consumer<String, String> kafkaConsumer,
-    ApplicationProperties properties)
+    ApplicationProperties properties,
+    ConfigurableApplicationContext applicationContext)
   {
     return
       new ExampleConsumer(
         properties.getClientId(),
         properties.getTopic(),
-        kafkaConsumer);
+        kafkaConsumer,
+        () -> applicationContext.close());
   }
 
   @Bean(destroyMethod = "")
index 66665be..e82719e 100644 (file)
@@ -17,13 +17,15 @@ public class ExampleConsumer implements Runnable
   private final String topic;
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
+  private final Runnable closeCallback;
 
   private long consumed = 0;
 
   public ExampleConsumer(
     String clientId,
     String topic,
-    Consumer<String, String> consumer)
+    Consumer<String, String> consumer,
+    Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
@@ -31,6 +33,8 @@ public class ExampleConsumer implements Runnable
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
+
+    this.closeCallback = closeCallback;
   }
 
 
@@ -66,6 +70,8 @@ public class ExampleConsumer implements Runnable
     {
       log.error("{} - Unexpected error, unsubscribing!", id, e);
       consumer.unsubscribe();
+      log.info("{} - Triggering exit of application!", id);
+      new Thread(closeCallback).start();
     }
     finally
     {