From: Kai Moritz Date: Fri, 10 Jan 2025 09:44:48 +0000 (+0100) Subject: WIP:refinement-BackOffState X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=6dc9d50b8af6253042ed08c935703a2bd15c978c;p=demos%2Fkafka%2Ftraining WIP:refinement-BackOffState --- diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 92d08df0..556b3b10 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -20,10 +20,12 @@ class BackOffState @Getter private final long offset; private final Instant startTime; + private final BackOffExecution backOffExecution; - private BackOffExecution backOffExecution; private int numRetries = 0; private Instant timeNextRetryIsDue; + private boolean inProgress; + private boolean successful = false; BackOffState() @@ -33,6 +35,9 @@ class BackOffState topicPartition = null; offset = -1; startTime = null; + + inProgress = false; + backOffExecution = null; } BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset) @@ -44,13 +49,14 @@ class BackOffState this.startTime = clock.instant(); log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime); + inProgress = true; backOffExecution = backOffStrategy.start(); initializeNextBackOff(); } boolean isWaitingForNextRetry() { - if (backOffExecution == null) + if (!inProgress) { return false; } @@ -71,12 +77,12 @@ class BackOffState boolean isRetryInProgress(long offset) { - return this.offset == offset && timeNextRetryIsDue != null; + return this.offset == offset && inProgress; } boolean isUnsuccessful() { - return backOffExecution == null && timeNextRetryIsDue != null; + return !successful && !inProgress; } int getNumRetries() @@ -91,17 +97,19 @@ class BackOffState void markRetryAsSuccessful() { - if (backOffExecution != null) + if (inProgress) { + Instant now = clock.instant(); log.info( "{} - retry #{} for {} succeeded after {}", id, numRetries, topicPartition, - Duration.between(startTime, clock.instant())); + Duration.between(startTime, now)); - backOffExecution = null; - timeNextRetryIsDue = null; + inProgress = false; + successful = true; + timeNextRetryIsDue = now; } } @@ -111,7 +119,7 @@ class BackOffState if (backOffMillis == BackOffExecution.STOP) { - backOffExecution = null; + inProgress = false; } else {