From 4cff65edeb302834b5b30c42d91e637f113f17ba Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Fri, 10 Jan 2025 10:46:59 +0100
Subject: [PATCH] FIX:double-mark-as-successful

---
 src/main/java/de/juplo/kafka/BackOffState.java    | 4 ++--
 src/main/java/de/juplo/kafka/ExampleConsumer.java | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java
index 556b3b10..b8a226b5 100644
--- a/src/main/java/de/juplo/kafka/BackOffState.java
+++ b/src/main/java/de/juplo/kafka/BackOffState.java
@@ -95,9 +95,9 @@ class BackOffState
     return Duration.between(startTime, timeNextRetryIsDue);
   }
 
-  void markRetryAsSuccessful()
+  void markRetryAsSuccessful(long offset)
   {
-    if (inProgress)
+    if (inProgress && this.offset == offset)
     {
       Instant now = clock.instant();
       log.info(
diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index 2ef4b0ad..51e69ffe 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -185,7 +185,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 log.warn("{} - Ignoring non-retryable error!", id, e);
               }
 
-              backOffState[topicPartition.partition()].markRetryAsSuccessful();
+              backOffState[topicPartition.partition()].markRetryAsSuccessful(record.offset());
             }
           }
         }
-- 
2.20.1