import org.apache.kafka.clients.consumer.StickyAssignor;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
   @Bean
   public ExampleConsumer exampleConsumer(
       Consumer<String, String> kafkaConsumer,
-      ApplicationProperties properties)
+      ApplicationProperties properties,
+      ConfigurableApplicationContext applicationContext)
   {
     return
         new ExampleConsumer(
             properties.getClientId(),
             properties.getConsumerProperties().getTopic(),
-            kafkaConsumer);
+            kafkaConsumer,
+            () -> applicationContext.close());
   }
 
   @Bean(destroyMethod = "")
 
   private final String topic;
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
+  private final Runnable closeCallback;
 
   private volatile boolean running = false;
   private long consumed = 0;
   public ExampleConsumer(
     String clientId,
     String topic,
-    Consumer<String, String> consumer)
+    Consumer<String, String> consumer,
+    Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
+
+    this.closeCallback = closeCallback;
   }
 
 
     {
       log.error("{} - Unexpected error, unsubscribing!", id, e);
       consumer.unsubscribe();
+      log.info("{} - Triggering exit of application!", id);
+      new Thread(closeCallback).start();
     }
     finally
     {