WIP:refinement-BackOffState
authorKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:44:48 +0000 (10:44 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:44:48 +0000 (10:44 +0100)
src/main/java/de/juplo/kafka/BackOffState.java

index 92d08df..556b3b1 100644 (file)
@@ -20,10 +20,12 @@ class BackOffState
   @Getter
   private final long offset;
   private final Instant startTime;
+  private final BackOffExecution backOffExecution;
 
-  private BackOffExecution backOffExecution;
   private int numRetries = 0;
   private Instant timeNextRetryIsDue;
+  private boolean inProgress;
+  private boolean successful = false;
 
 
   BackOffState()
@@ -33,6 +35,9 @@ class BackOffState
     topicPartition = null;
     offset = -1;
     startTime = null;
+
+    inProgress = false;
+    backOffExecution = null;
   }
 
   BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
@@ -44,13 +49,14 @@ class BackOffState
     this.startTime = clock.instant();
 
     log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+    inProgress = true;
     backOffExecution = backOffStrategy.start();
     initializeNextBackOff();
   }
 
   boolean isWaitingForNextRetry()
   {
-    if (backOffExecution == null)
+    if (!inProgress)
     {
       return false;
     }
@@ -71,12 +77,12 @@ class BackOffState
 
   boolean isRetryInProgress(long offset)
   {
-    return this.offset == offset && timeNextRetryIsDue != null;
+    return this.offset == offset && inProgress;
   }
 
   boolean isUnsuccessful()
   {
-    return backOffExecution == null && timeNextRetryIsDue != null;
+    return !successful && !inProgress;
   }
 
   int getNumRetries()
@@ -91,17 +97,19 @@ class BackOffState
 
   void markRetryAsSuccessful()
   {
-    if (backOffExecution != null)
+    if (inProgress)
     {
+      Instant now = clock.instant();
       log.info(
         "{} - retry #{} for {} succeeded after {}",
         id,
         numRetries,
         topicPartition,
-        Duration.between(startTime, clock.instant()));
+        Duration.between(startTime, now));
 
-      backOffExecution = null;
-      timeNextRetryIsDue = null;
+      inProgress = false;
+      successful = true;
+      timeNextRetryIsDue = now;
     }
   }
 
@@ -111,7 +119,7 @@ class BackOffState
 
     if (backOffMillis == BackOffExecution.STOP)
     {
-      backOffExecution = null;
+      inProgress = false;
     }
     else
     {