From: Kai Moritz Date: Fri, 10 Jan 2025 09:46:59 +0000 (+0100) Subject: FIX:double-mark-as-successful X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=4cff65edeb302834b5b30c42d91e637f113f17ba;p=demos%2Fkafka%2Ftraining FIX:double-mark-as-successful --- diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 556b3b10..b8a226b5 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -95,9 +95,9 @@ class BackOffState return Duration.between(startTime, timeNextRetryIsDue); } - void markRetryAsSuccessful() + void markRetryAsSuccessful(long offset) { - if (inProgress) + if (inProgress && this.offset == offset) { Instant now = clock.instant(); log.info( diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 2ef4b0ad..51e69ffe 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -185,7 +185,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable log.warn("{} - Ignoring non-retryable error!", id, e); } - backOffState[topicPartition.partition()].markRetryAsSuccessful(); + backOffState[topicPartition.partition()].markRetryAsSuccessful(record.offset()); } } }