]> juplo.de Git - demos/kafka/training/commitdiff
FIX:double-mark-as-successful
authorKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:46:59 +0000 (10:46 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:46:59 +0000 (10:46 +0100)
src/main/java/de/juplo/kafka/BackOffState.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 556b3b10463e02c51b9a6d2e82d9bff872d498f4..b8a226b5bf725d61c3619e0cefeb0a951786aaa8 100644 (file)
@@ -95,9 +95,9 @@ class BackOffState
     return Duration.between(startTime, timeNextRetryIsDue);
   }
 
-  void markRetryAsSuccessful()
+  void markRetryAsSuccessful(long offset)
   {
-    if (inProgress)
+    if (inProgress && this.offset == offset)
     {
       Instant now = clock.instant();
       log.info(
index 2ef4b0ad0ca42c319960efca12e1784763bcc768..51e69ffed91f51b7b890f0cccfb03b4093cb1fff 100644 (file)
@@ -185,7 +185,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 log.warn("{} - Ignoring non-retryable error!", id, e);
               }
 
-              backOffState[topicPartition.partition()].markRetryAsSuccessful();
+              backOffState[topicPartition.partition()].markRetryAsSuccessful(record.offset());
             }
           }
         }