From 4cff65edeb302834b5b30c42d91e637f113f17ba Mon Sep 17 00:00:00 2001 From: Kai Moritz <kai@juplo.de> Date: Fri, 10 Jan 2025 10:46:59 +0100 Subject: [PATCH] FIX:double-mark-as-successful --- src/main/java/de/juplo/kafka/BackOffState.java | 4 ++-- src/main/java/de/juplo/kafka/ExampleConsumer.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 556b3b10..b8a226b5 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -95,9 +95,9 @@ class BackOffState return Duration.between(startTime, timeNextRetryIsDue); } - void markRetryAsSuccessful() + void markRetryAsSuccessful(long offset) { - if (inProgress) + if (inProgress && this.offset == offset) { Instant now = clock.instant(); log.info( diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 2ef4b0ad..51e69ffe 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -185,7 +185,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable log.warn("{} - Ignoring non-retryable error!", id, e); } - backOffState[topicPartition.partition()].markRetryAsSuccessful(); + backOffState[topicPartition.partition()].markRetryAsSuccessful(record.offset()); } } } -- 2.20.1