From f44b8ae206399b0ca9c5db90b9bdc8613a7091d4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Jan 2025 11:21:42 +0100 Subject: [PATCH] =?utf8?q?`BackOffState`=20wird=20nur=201x=20erzeugt=20und?= =?utf8?q?=20danach=20zur=C3=BCckgesetzt=20und=20gestartet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/BackOffState.java | 22 +++++----------- .../java/de/juplo/kafka/ExampleConsumer.java | 10 +++++-- .../java/de/juplo/kafka/BackOffStateTest.java | 26 ++++++++++++++----- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 5b41a1c7..d653dcd4 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.TopicPartition; import org.springframework.util.backoff.BackOff; @@ -11,35 +12,25 @@ import java.time.Duration; import java.time.Instant; +@RequiredArgsConstructor @Slf4j class BackOffState { private final String id; private final Clock clock; private final TopicPartition topicPartition; - @Getter - private final long offset; + private final BackOff backOffStrategy; + @Getter + private long offset; private BackOffExecution backOffExecution; private int numRetries = 0; private Instant timeNextRetryIsDue; - BackOffState() - { - id = "NONE"; - clock = null; - topicPartition = null; - offset = -1; - } - - BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset) + void start(long offset) { - this.id = id; - this.clock = clock; - this.topicPartition = topicPartition; this.offset = offset; - log.info( "{} - Back-Off requested for offset={} in {}", id, @@ -95,6 +86,7 @@ class BackOffState void reset() { timeNextRetryIsDue = null; + offset = -1l; } private void initializeNextBackOff() diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index b767ca82..f4ce5310 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -149,7 +149,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable if (!backOffState[partition].isStarted(record.offset())) { log.info("{} - First occurrence of a retryable error: {}", id, e.toString()); - backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset()); + backOffState[partition].start(record.offset()); consumer.seek(topicPartition, record.offset()); break; } @@ -224,12 +224,18 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable @Override public void onPartitionsAssigned(Collection partitions) { - partitions.forEach(tp -> backOffState[tp.partition()] = new BackOffState()); + partitions.forEach(topicPartition -> + backOffState[topicPartition.partition()] = new BackOffState( + id, + clock, + topicPartition, + backOffStrategy)); } @Override public void onPartitionsRevoked(Collection partitions) { + partitions.forEach(tp -> backOffState[tp.partition()] = null); } diff --git a/src/test/java/de/juplo/kafka/BackOffStateTest.java b/src/test/java/de/juplo/kafka/BackOffStateTest.java index e77d23f4..3ab25f41 100644 --- a/src/test/java/de/juplo/kafka/BackOffStateTest.java +++ b/src/test/java/de/juplo/kafka/BackOffStateTest.java @@ -33,7 +33,12 @@ class BackOffStateTest private BackOffState NotStartedBackOffState() { - return new BackOffState(); + // GIVEN + + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff); + + return backOffState; } @Test @@ -69,7 +74,11 @@ class BackOffStateTest given(backOff.start()).willReturn(backOffExecution); given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); - return new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET); + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff); + backOffState.start(OFFSET); + + return backOffState; } @Test @@ -114,11 +123,16 @@ class BackOffStateTest private BackOffState StartedBackoffStateWithRetries() { + // GIVEN given(clock.instant()).willReturn(NOW); given(backOff.start()).willReturn(backOffExecution); given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); - return new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET); + // WHEN + BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff); + backOffState.start(OFFSET); + + return backOffState; } @Test @@ -277,8 +291,8 @@ class BackOffStateTest } @Test - @DisplayName("A started BackOffState is started if it is marked as completed") - void StartedBackOffStateIsStartedIfMarkedCompleted() + @DisplayName("A started BackOffState is not started after a reset") + void StartedBackOffStateIsNotStartedAfterReset() { // GIVEN BackOffState backOffState = StartedBackoffStateWithRetries(); @@ -287,6 +301,6 @@ class BackOffStateTest backOffState.reset(); // THEN - assertThat(backOffState.isStarted(OFFSET)).isTrue(); + assertThat(backOffState.isStarted(OFFSET)).isFalse(); } } -- 2.20.1