@Bean
public ExampleConsumer exampleConsumer(
Consumer<String, String> kafkaConsumer,
- Producer<String, String> kafkaProducer,
ApplicationProperties properties,
Clock clock,
ConfigurableApplicationContext applicationContext)
{
+ Producer<String, String> kafkaProducer = kafkaProducer(properties);
return
new ExampleConsumer(
properties.getClientId(),
return new KafkaConsumer<>(props);
}
- @Bean
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
- sendOffsets();
log.info("{} - Final commit for transaction", id);
producer.commitTransaction();
+ log.info("{} - Closing the producer", id);
+ producer.close();
}
private void commitIfNecessary()