import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Bean
public ExampleProducer exampleProducer(
ApplicationProperties properties,
- Producer<String, String> kafkaProducer)
+ Producer<String, String> kafkaProducer,
+ ConfigurableApplicationContext applicationContext)
{
return
new ExampleProducer(
properties.getProducerProperties().getThrottle() == null
? Duration.ofMillis(500)
: properties.getProducerProperties().getThrottle(),
- kafkaProducer);
+ kafkaProducer,
+ () -> applicationContext.close());
}
@Bean
private final Duration throttle;
private final Producer<String, String> producer;
private final Thread workerThread;
+ private final Runnable closeCallback;
private volatile boolean running = true;
private long produced = 0;
String id,
String topic,
Duration throttle,
- Producer<String, String> producer)
+ Producer<String, String> producer,
+ Runnable closeCallback)
{
this.id = id;
this.topic = topic;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
workerThread.start();
+
+ this.closeCallback = closeCallback;
}
catch (Exception e)
{
log.error("{} - Unexpected error!", id, e);
+ log.info("{} - Triggering exit of application!", id);
+ new Thread(closeCallback).start();
}
finally
{