this.startTime = clock.instant();
this.timeNextRetryIsDue = this.startTime;
- log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+ log.info(
+ "{} - Back-Off requested for offset={} in {}",
+ id,
+ offset,
+ topicPartition);
backOffExecution = backOffStrategy.start();
initializeNextBackOff();
}
Duration remaining = Duration.between(now, timeNextRetryIsDue);
if (remaining.isNegative())
{
- numRetries++;
- log.info("{} - {}. retry for {}, lateness: {}", id, numRetries, topicPartition, remaining.abs());
+ log.info(
+ "{} - {}. retry for offset={} in {}, lateness: {}",
+ id,
+ numRetries,
+ offset,
+ topicPartition,
+ remaining.abs());
initializeNextBackOff();
return false;
}
else
{
- log.info("{} - Next retry for {} is due in {}", id, topicPartition, remaining);
+ log.info(
+ "{} - Next retry for offset={} in {} is due in {}",
+ id,
+ offset,
+ topicPartition,
+ remaining);
return true;
}
}
return backOffExecution == null && timeNextRetryIsDue != null;
}
- int getNumRetries()
- {
- return numRetries;
- }
-
- Duration getDurationSpendRetrying()
- {
- return Duration.between(startTime, timeNextRetryIsDue);
- }
-
void markAsCompleted()
{
if (backOffExecution != null)
{
log.info(
- "{} - retry #{} for {} succeeded after {}",
+ "{} - {}. retry for offset={} in {} succeeded after {}",
id,
numRetries,
+ offset,
topicPartition,
- Duration.between(startTime, clock.instant()));
+ Duration.between(startTime, timeNextRetryIsDue));
backOffExecution = null;
timeNextRetryIsDue = null;
}
else
{
+ numRetries++;
timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
}
}
int partition = topicPartition.partition();
if (!backOffState[partition].isStarted(record.offset()))
{
- log.info(
- "{} - First occurrence of a retryable error for offset={} in partition {} - Initializing retry!",
- id,
- record.offset(),
- partition);
+ log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset());
consumer.seek(topicPartition, record.offset());
break;
{
if (backOffState[partition].isCompleted())
{
- log.warn(
- "{} - Ignoring retryable error after {} attempts and {}",
- id,
- backOffState[partition].getNumRetries(),
- backOffState[partition].getDurationSpendRetrying());
+ log.warn("{} - Ignoring retryable error: {}", id, e.toString());
}
else
{
log.info(
- "{} - Retry in progress for offset={} in partition {}",
+ "{} - Retry in progress for offset={} in {}, error: {}",
id,
record.offset(),
- partition);
+ partition,
+ e.toString());
consumer.seek(topicPartition, record.offset());
break;
}
catch (NonRetriableErrorException e)
{
// Just ignore, to skip
- log.warn("{} - Ignoring non-retryable error!", id, e);
+ log.warn("{} - Ignoring non-retryable error: {}", id, e.toString());
}
backOffState[topicPartition.partition()].markAsCompleted();