recordsForPartition.size(),
topicPartition);
- boolean partitionHasRetryableError = false;
-
for (ConsumerRecord<String, Long> record : recordsForPartition)
{
if (abortCurrentPoll)
record.offset(),
partition);
backOffState[partition] = new BackOffState(topicPartition, record.offset());
- partitionHasRetryableError = true;
consumer.seek(topicPartition, record.offset());
break;
}
record.offset(),
partition);
consumer.seek(topicPartition, record.offset());
- partitionHasRetryableError = true;
break;
}
}
log.warn("{} - Ignoring non-retryable error!", id, e);
}
- if (!partitionHasRetryableError)
- {
- backOffState[topicPartition.partition()].markRetryAsSuccessful();
- }
+ backOffState[topicPartition.partition()].markRetryAsSuccessful();
}
}
}