import org.apache.kafka.clients.consumer.StickyAssignor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Bean
public ExampleConsumer exampleConsumer(
Consumer<String, String> kafkaConsumer,
- ApplicationProperties properties)
+ ApplicationProperties properties,
+ ConfigurableApplicationContext applicationContext)
{
return
new ExampleConsumer(
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
- kafkaConsumer);
+ kafkaConsumer,
+ () -> applicationContext.close());
}
@Bean(destroyMethod = "")
private final String topic;
private final Consumer<String, String> consumer;
private final Thread workerThread;
+ private final Runnable closeCallback;
private volatile boolean running = false;
private long consumed = 0;
public ExampleConsumer(
String clientId,
String topic,
- Consumer<String, String> consumer)
+ Consumer<String, String> consumer,
+ Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
+
+ this.closeCallback = closeCallback;
}
{
log.error("{} - Unexpected error, unsubscribing!", id, e.toString());
consumer.unsubscribe();
+ log.info("{} - Triggering exit of application!", id);
+ new Thread(closeCallback).start();
}
finally
{