From 33cfe5c45fc7008c92a8743f54afa017b65a0e46 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Jan 2025 10:53:15 +0100 Subject: [PATCH] =?utf8?q?=C3=9Cberpr=C3=BCfung=20des=20``BackOffState``-Z?= =?utf8?q?ustands=20ber=C3=BCcksichtigt=20immer=20den=20Offset?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/BackOffState.java | 4 ++-- src/main/java/de/juplo/kafka/ExampleConsumer.java | 2 +- src/test/java/de/juplo/kafka/BackOffStateTest.java | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 0015ebec..4981bb6c 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -90,9 +90,9 @@ class BackOffState return this.offset == offset && backOffExecution != null; } - boolean isCompleted() + boolean isCompleted(long offset) { - return timeNextRetryIsDue == null; + return this.offset == offset && timeNextRetryIsDue == null; } void markAsCompleted() diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index b48baaa4..9e73b119 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -155,7 +155,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable } else { - if (backOffState[partition].isCompleted()) + if (backOffState[partition].isCompleted(record.offset())) { log.warn("{} - Ignoring retryable error: {}", id, e.toString()); } diff --git a/src/test/java/de/juplo/kafka/BackOffStateTest.java b/src/test/java/de/juplo/kafka/BackOffStateTest.java index 624b354b..338631c1 100644 --- a/src/test/java/de/juplo/kafka/BackOffStateTest.java +++ b/src/test/java/de/juplo/kafka/BackOffStateTest.java @@ -108,7 +108,7 @@ class BackOffStateTest // WHEN // THEN - assertThat(backOffState.isCompleted()).isTrue(); + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); } @@ -163,7 +163,7 @@ class BackOffStateTest backOffState.isWaitingForNextRetry(); // THEN - assertThat(backOffState.isCompleted()).isFalse(); + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); } @Test @@ -211,7 +211,7 @@ class BackOffStateTest backOffState.isWaitingForNextRetry(); // THEN - assertThat(backOffState.isCompleted()).isFalse(); + assertThat(backOffState.isCompleted(OFFSET)).isFalse(); } @Test @@ -259,7 +259,7 @@ class BackOffStateTest backOffState.isWaitingForNextRetry(); // THEN - assertThat(backOffState.isCompleted()).isTrue(); + assertThat(backOffState.isCompleted(OFFSET)).isTrue(); } @Test -- 2.20.1