}
}
- boolean isRetryInProgress(long offset)
+ boolean isStarted(long offset)
{
return this.offset == offset && timeNextRetryIsDue != null;
}
- boolean isUnsuccessful()
+ boolean isCompleted()
{
return backOffExecution == null && timeNextRetryIsDue != null;
}
return Duration.between(startTime, timeNextRetryIsDue);
}
- void markRetryAsSuccessful()
+ void markAsCompleted()
{
if (backOffExecution != null)
{
// Seeking to the offset of the record, that raised the exception, and
// leaving the loop afterwards, retries the record
int partition = topicPartition.partition();
- if (!backOffState[partition].isRetryInProgress(record.offset()))
+ if (!backOffState[partition].isStarted(record.offset()))
{
log.info(
"{} - First occurrence of a retryable error for offset={} in partition {} - Initializing retry!",
}
else
{
- if (backOffState[partition].isUnsuccessful())
+ if (backOffState[partition].isCompleted())
{
log.warn(
"{} - Ignoring retryable error after {} attempts and {}",
log.warn("{} - Ignoring non-retryable error!", id, e);
}
- backOffState[topicPartition.partition()].markRetryAsSuccessful();
+ backOffState[topicPartition.partition()].markAsCompleted();
}
}
}