From 1426195ca6ccdb2d1cb6fb1287324de8a6f59e47 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Jan 2025 14:40:44 +0100 Subject: [PATCH] =?utf8?q?=C3=9Cberfl=C3=BCssiges=20Attribut=20in=20`BackO?= =?utf8?q?ffState`=20entfernt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/BackOffState.java | 20 ++++++------------- .../java/de/juplo/kafka/ExampleConsumer.java | 3 +-- .../java/de/juplo/kafka/BackOffStateTest.java | 8 +++----- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index d653dcd4..8c6785e3 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -3,7 +3,6 @@ package de.juplo.kafka; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.TopicPartition; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.BackOffExecution; @@ -16,9 +15,8 @@ import java.time.Instant; @Slf4j class BackOffState { - private final String id; + private final String logPrefix; private final Clock clock; - private final TopicPartition topicPartition; private final BackOff backOffStrategy; @Getter @@ -31,11 +29,7 @@ class BackOffState void start(long offset) { this.offset = offset; - log.info( - "{} - Back-Off requested for offset={} in {}", - id, - offset, - topicPartition); + log.info("{} - Back-Off requested for offset={}", logPrefix, offset); backOffExecution = backOffStrategy.start(); initializeNextBackOff(); } @@ -52,11 +46,10 @@ class BackOffState if (remaining.isNegative()) { log.info( - "{} - {}. retry for offset={} in {}, lateness: {}", - id, + "{} - {}. retry for offset={}, lateness: {}", + logPrefix, numRetries, offset, - topicPartition, remaining.abs()); initializeNextBackOff(); return false; @@ -64,10 +57,9 @@ class BackOffState else { log.info( - "{} - Next retry for offset={} in {} is due in {}", - id, + "{} - Next retry for offset={} is due in {}", + logPrefix, offset, - topicPartition, remaining); return true; } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f4ce5310..f2ac444f 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -226,9 +226,8 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable { partitions.forEach(topicPartition -> backOffState[topicPartition.partition()] = new BackOffState( - id, + id + " - partition=" + topicPartition.partition(), clock, - topicPartition, backOffStrategy)); } diff --git a/src/test/java/de/juplo/kafka/BackOffStateTest.java b/src/test/java/de/juplo/kafka/BackOffStateTest.java index 3ab25f41..f58f545b 100644 --- a/src/test/java/de/juplo/kafka/BackOffStateTest.java +++ b/src/test/java/de/juplo/kafka/BackOffStateTest.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -20,7 +19,6 @@ import static org.mockito.BDDMockito.given; class BackOffStateTest { final static String ID = "TEST"; - final static TopicPartition TOPIC_PARTITION = new TopicPartition("test", 0); final static long OFFSET = 666; final static Instant NOW = Instant.now(); final static long BACK_OFF = 1000l; @@ -36,7 +34,7 @@ class BackOffStateTest // GIVEN // WHEN - BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff); + BackOffState backOffState = new BackOffState(ID, clock, backOff); return backOffState; } @@ -75,7 +73,7 @@ class BackOffStateTest given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP); // WHEN - BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff); + BackOffState backOffState = new BackOffState(ID, clock, backOff); backOffState.start(OFFSET); return backOffState; @@ -129,7 +127,7 @@ class BackOffStateTest given(backOffExecution.nextBackOff()).willReturn(BACK_OFF); // WHEN - BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff); + BackOffState backOffState = new BackOffState(ID, clock, backOff); backOffState.start(OFFSET); return backOffState; -- 2.20.1