log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
- commit();
+ sendOffsets();
+ log.info("{} - Final commit for transaction", id);
+ producer.commitTransaction();
}
private void commitIfNecessary()
private void commit(Instant now)
{
- log.info("{} - Sending offsets to transaction and commiting", id);
- producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+ sendOffsets();
commitTx();
lastCommit = now;
}
+ private void sendOffsets()
+ {
+ log.info("{} - Sending offsets to transaction", id);
+ producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+ }
+
private void commitTx()
{
+ log.info("{} - Rolling transaction", id);
producer.commitTransaction();
producer.beginTransaction();
}