]> juplo.de Git - demos/kafka/training/commitdiff
Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset
authorKai Moritz <kai@juplo.de>
Sat, 11 Jan 2025 09:53:15 +0000 (10:53 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 21 Jan 2025 20:25:04 +0000 (21:25 +0100)
src/main/java/de/juplo/kafka/BackOffState.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/BackOffStateTest.java

index 0015ebec586f8ec91de15d55b1de75699f47c0e9..4981bb6c45fe314529d23eb25c0bd2f6cd93b858 100644 (file)
@@ -90,9 +90,9 @@ class BackOffState
     return this.offset == offset && backOffExecution != null;
   }
 
-  boolean isCompleted()
+  boolean isCompleted(long offset)
   {
-    return timeNextRetryIsDue == null;
+    return this.offset == offset && timeNextRetryIsDue == null;
   }
 
   void markAsCompleted()
index b48baaa4ebaeaf1c1a43abdd446bce4e02a0281f..9e73b1196443f09432ae7123543c303bf891c4c3 100644 (file)
@@ -155,7 +155,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 }
                 else
                 {
-                  if (backOffState[partition].isCompleted())
+                  if (backOffState[partition].isCompleted(record.offset()))
                   {
                     log.warn("{} - Ignoring retryable error: {}", id, e.toString());
                   }
index 624b354b5ce3acc04cd7d26a778aeb8964471506..338631c156eb7df599f31eea78972e617c878ead 100644 (file)
@@ -108,7 +108,7 @@ class BackOffStateTest
     // WHEN
 
     // THEN
-    assertThat(backOffState.isCompleted()).isTrue();
+    assertThat(backOffState.isCompleted(OFFSET)).isTrue();
   }
 
 
@@ -163,7 +163,7 @@ class BackOffStateTest
     backOffState.isWaitingForNextRetry();
 
     // THEN
-    assertThat(backOffState.isCompleted()).isFalse();
+    assertThat(backOffState.isCompleted(OFFSET)).isFalse();
   }
 
   @Test
@@ -211,7 +211,7 @@ class BackOffStateTest
     backOffState.isWaitingForNextRetry();
 
     // THEN
-    assertThat(backOffState.isCompleted()).isFalse();
+    assertThat(backOffState.isCompleted(OFFSET)).isFalse();
   }
 
   @Test
@@ -259,7 +259,7 @@ class BackOffStateTest
     backOffState.isWaitingForNextRetry();
 
     // THEN
-    assertThat(backOffState.isCompleted()).isTrue();
+    assertThat(backOffState.isCompleted(OFFSET)).isTrue();
   }
 
   @Test