try
{
- log.info("C - Subscribing to topic test");
+ log.info("Subscribing to topic {}", inputTopic);
consumer.subscribe(
Arrays.asList(inputTopic),
new TransactionalConsumerRebalanceListener());
{
log.info("Closing consumer");
consumer.close();
- log.info("C - DONE!");
+ log.info("Closing producer");
+ producer.close();
+ log.info("Exiting!");
}
}
private void commitTransaction()
{
log.info("Committing transaction");
+ producer.sendOffsetsToTransaction(
+ consumer.po,
+ consumer.groupMetadata());
producer.commitTransaction();
}