@Getter
private final long offset;
private final Instant startTime;
+ private final BackOffExecution backOffExecution;
- private BackOffExecution backOffExecution;
private int numRetries = 0;
private Instant timeNextRetryIsDue;
+ private boolean inProgress;
+ private boolean successful = false;
BackOffState()
topicPartition = null;
offset = -1;
startTime = null;
+
+ inProgress = false;
+ backOffExecution = null;
}
BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
this.startTime = clock.instant();
log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+ inProgress = true;
backOffExecution = backOffStrategy.start();
initializeNextBackOff();
}
boolean isWaitingForNextRetry()
{
- if (backOffExecution == null)
+ if (!inProgress)
{
return false;
}
boolean isRetryInProgress(long offset)
{
- return this.offset == offset && timeNextRetryIsDue != null;
+ return this.offset == offset && inProgress;
}
boolean isUnsuccessful()
{
- return backOffExecution == null && timeNextRetryIsDue != null;
+ return !successful && !inProgress;
}
int getNumRetries()
void markRetryAsSuccessful()
{
- if (backOffExecution != null)
+ if (inProgress)
{
+ Instant now = clock.instant();
log.info(
"{} - retry #{} for {} succeeded after {}",
id,
numRetries,
topicPartition,
- Duration.between(startTime, clock.instant()));
+ Duration.between(startTime, now));
- backOffExecution = null;
- timeNextRetryIsDue = null;
+ inProgress = false;
+ successful = true;
+ timeNextRetryIsDue = now;
}
}
if (backOffMillis == BackOffExecution.STOP)
{
- backOffExecution = null;
+ inProgress = false;
}
else
{