int partition = topicPartition.partition();
if (!backOffState[partition].isRetryInProgress(record.offset()))
{
+ log.info(
+ "{} - First occurrence of a retryable error for offset={} in partition {} - Initializing retry!",
+ id,
+ record.offset(),
+ partition);
backOffState[partition] = new BackOffState(topicPartition, record.offset());
partitionHasRetryableError = true;
consumer.seek(topicPartition, record.offset());
}
else
{
+ log.info(
+ "{} - Retry in progress for offset={} in partition {}",
+ id,
+ record.offset(),
+ partition);
consumer.seek(topicPartition, record.offset());
partitionHasRetryableError = true;
break;