From: Kai Moritz Date: Sat, 11 Jan 2025 09:53:15 +0000 (+0100) Subject: Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset X-Git-Tag: consumer/spring-consumer--error-handling--2025-02-signal~4 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=ff1fd1397b353c6412f4222170510a03c5f42db1;p=demos%2Fkafka%2Ftraining Überprüfung des ``BackOffState``-Zustands berücksichtigt immer den Offset --- diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 0015ebec..4981bb6c 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -90,9 +90,9 @@ class BackOffState return this.offset == offset && backOffExecution != null; } - boolean isCompleted() + boolean isCompleted(long offset) { - return timeNextRetryIsDue == null; + return this.offset == offset && timeNextRetryIsDue == null; } void markAsCompleted() diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index b48baaa4..9e73b119 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -155,7 +155,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable } else { - if (backOffState[partition].isCompleted()) + if (backOffState[partition].isCompleted(record.offset())) { log.warn("{} - Ignoring retryable error: {}", id, e.toString()); } diff --git a/src/test/java/de/juplo/kafka/BackOffStateTest.java b/src/test/java/de/juplo/kafka/BackOffStateTest.java index 624b354b..338631c1 100644 --- a/src/test/java/de/juplo/kafka/BackOffStateTest.java +++ b/src/test/java/de/juplo/kafka/BackOffStateTest.java @@ -108,7 +108,7 @@ class BackOffStateTest // WHEN // THEN - assertThat(backOffState.isCompleted()).isTrue(); + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); } @@ -163,7 +163,7 @@ class BackOffStateTest backOffState.isWaitingForNextRetry(); // THEN - assertThat(backOffState.isCompleted()).isFalse(); + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); } @Test @@ -211,7 +211,7 @@ class BackOffStateTest backOffState.isWaitingForNextRetry(); // THEN - assertThat(backOffState.isCompleted()).isFalse(); + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); } @Test @@ -259,7 +259,7 @@ class BackOffStateTest backOffState.isWaitingForNextRetry(); // THEN - assertThat(backOffState.isCompleted()).isTrue(); + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); } @Test