Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset
authorKai Moritz <kai@juplo.de>
Sat, 11 Jan 2025 09:53:15 +0000 (10:53 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 6 Feb 2025 17:04:39 +0000 (18:04 +0100)
src/main/java/de/juplo/kafka/BackOffState.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/BackOffStateTest.java

index 0015ebe..4981bb6 100644 (file)
@@ -90,9 +90,9 @@ class BackOffState
     return this.offset == offset && backOffExecution != null;
   }
 
-  boolean isCompleted()
+  boolean isCompleted(long offset)
   {
-    return timeNextRetryIsDue == null;
+    return this.offset == offset && timeNextRetryIsDue == null;
   }
 
   void markAsCompleted()
index b48baaa..9e73b11 100644 (file)
@@ -155,7 +155,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 }
                 else
                 {
-                  if (backOffState[partition].isCompleted())
+                  if (backOffState[partition].isCompleted(record.offset()))
                   {
                     log.warn("{} - Ignoring retryable error: {}", id, e.toString());
                   }
index 624b354..338631c 100644 (file)
@@ -108,7 +108,7 @@ class BackOffStateTest
     // WHEN
 
     // THEN
-    assertThat(backOffState.isCompleted()).isTrue();
+    assertThat(backOffState.isCompleted(OFFSET)).isTrue();
   }
 
 
@@ -163,7 +163,7 @@ class BackOffStateTest
     backOffState.isWaitingForNextRetry();
 
     // THEN
-    assertThat(backOffState.isCompleted()).isFalse();
+    assertThat(backOffState.isCompleted(OFFSET)).isFalse();
   }
 
   @Test
@@ -211,7 +211,7 @@ class BackOffStateTest
     backOffState.isWaitingForNextRetry();
 
     // THEN
-    assertThat(backOffState.isCompleted()).isFalse();
+    assertThat(backOffState.isCompleted(OFFSET)).isFalse();
   }
 
   @Test
@@ -259,7 +259,7 @@ class BackOffStateTest
     backOffState.isWaitingForNextRetry();
 
     // THEN
-    assertThat(backOffState.isCompleted()).isTrue();
+    assertThat(backOffState.isCompleted(OFFSET)).isTrue();
   }
 
   @Test