offsetRecorder.recordOffset(record.partition(), record.offset());
handler.accept(record);
+ consumed++;
+ }
- long now = clock.millis();
- if (now - lastCommit >= autoCommitIntervalMs)
+ long now = clock.millis();
+ if (now - lastCommit >= autoCommitIntervalMs)
+ {
+ Map<TopicPartition, OffsetAndMetadata> pendingOffsets = offsetRecorder.getAndClearOffsets();
+ if (pendingOffsets.isEmpty())
{
+ log.info("No pending offsets to commit after {}", Duration.ofMillis(now - lastCommit));
+ }
+ else
+ {
+ log.info("Committing pending offsets after {}", Duration.ofMillis(now - lastCommit));
producer.sendOffsetsToTransaction(
- offsetRecorder.getOffsets(),
+ pendingOffsets,
consumer.groupMetadata());
producer.commitTransaction();
+ lastCommit = now;
producer.beginTransaction();
}
- consumed++;
}
}
}
{
log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
producer.sendOffsetsToTransaction(
- offsetRecorder.getOffsets(),
+ offsetRecorder.getAndClearOffsets(),
consumer.groupMetadata());
producer.commitTransaction();
shutdown();
e.getCause().toString());
producer.sendOffsetsToTransaction(
- offsetRecorder.getOffsets(),
+ offsetRecorder.getAndClearOffsets(),
consumer.groupMetadata());
producer.commitTransaction();
shutdown(e);
offsets[partition] = offset + 1;
}
- Map<TopicPartition, OffsetAndMetadata> getOffsets()
+ Map<TopicPartition, OffsetAndMetadata> getAndClearOffsets()
{
Map<TopicPartition, OffsetAndMetadata> recordedOffsets = new HashMap<>();
{
if (activePartitions[i] && offsets[i] > -1)
{
- log.info("Offset for partition {} is {}", i, offsets[i]);
+ log.info("Offset {} of partition {} is pending for a commit", offsets[i], i);
recordedOffsets.put(
new TopicPartition(topic, i),
new OffsetAndMetadata(offsets[i]));
+ offsets[i] = -1;
}
}