log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
- producer.commitTransaction();
+ commit();
}
private void commitIfNecessary()
Instant now = clock.instant();
if (!now.isBefore(lastCommit.plus(commitInterval)))
{
- producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
- commit();
- lastCommit = now;
+ commit(now);
}
}
}
private void commit()
+ {
+ commit(clock.instant());
+ }
+
+ private void commit(Instant now)
+ {
+ log.info("{} - Sending offsets to transaction and commiting", id);
+ producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+ commitTx();
+ lastCommit = now;
+ }
+
+ private void commitTx()
{
producer.commitTransaction();
producer.beginTransaction();
@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ commit();
partitions
.stream()
.filter(partition -> partition.topic().equals(topic))