From 6dc9d50b8af6253042ed08c935703a2bd15c978c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 10 Jan 2025 10:44:48 +0100 Subject: [PATCH] WIP:refinement-BackOffState --- .../java/de/juplo/kafka/BackOffState.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 92d08df0..556b3b10 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -20,10 +20,12 @@ class BackOffState @Getter private final long offset; private final Instant startTime; + private final BackOffExecution backOffExecution; - private BackOffExecution backOffExecution; private int numRetries = 0; private Instant timeNextRetryIsDue; + private boolean inProgress; + private boolean successful = false; BackOffState() @@ -33,6 +35,9 @@ class BackOffState topicPartition = null; offset = -1; startTime = null; + + inProgress = false; + backOffExecution = null; } BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset) @@ -44,13 +49,14 @@ class BackOffState this.startTime = clock.instant(); log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime); + inProgress = true; backOffExecution = backOffStrategy.start(); initializeNextBackOff(); } boolean isWaitingForNextRetry() { - if (backOffExecution == null) + if (!inProgress) { return false; } @@ -71,12 +77,12 @@ class BackOffState boolean isRetryInProgress(long offset) { - return this.offset == offset && timeNextRetryIsDue != null; + return this.offset == offset && inProgress; } boolean isUnsuccessful() { - return backOffExecution == null && timeNextRetryIsDue != null; + return !successful && !inProgress; } int getNumRetries() @@ -91,17 +97,19 @@ class BackOffState void markRetryAsSuccessful() { - if (backOffExecution != null) + if (inProgress) { + Instant now = clock.instant(); log.info( "{} - retry #{} for {} succeeded after {}", id, numRetries, topicPartition, - Duration.between(startTime, clock.instant())); + Duration.between(startTime, now)); - backOffExecution = null; - timeNextRetryIsDue = null; + inProgress = false; + successful = true; + timeNextRetryIsDue = now; } } @@ -111,7 +119,7 @@ class BackOffState if (backOffMillis == BackOffExecution.STOP) { - backOffExecution = null; + inProgress = false; } else { -- 2.20.1