Überflüssiges Attribut in `BackOffState` entfernt consumer/spring-consumer--error-handling--2025-02-signal
authorKai Moritz <kai@juplo.de>
Sat, 11 Jan 2025 13:40:44 +0000 (14:40 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Jan 2025 19:42:18 +0000 (20:42 +0100)
src/main/java/de/juplo/kafka/BackOffState.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/BackOffStateTest.java

index d653dcd..8c6785e 100644 (file)
@@ -3,7 +3,6 @@ package de.juplo.kafka;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.util.backoff.BackOff;
 import org.springframework.util.backoff.BackOffExecution;
 
@@ -16,9 +15,8 @@ import java.time.Instant;
 @Slf4j
 class BackOffState
 {
-  private final String id;
+  private final String logPrefix;
   private final Clock clock;
-  private final TopicPartition topicPartition;
   private final BackOff backOffStrategy;
 
   @Getter
@@ -31,11 +29,7 @@ class BackOffState
   void start(long offset)
   {
     this.offset = offset;
-    log.info(
-      "{} - Back-Off requested for offset={} in {}",
-      id,
-      offset,
-      topicPartition);
+    log.info("{} - Back-Off requested for offset={}", logPrefix, offset);
     backOffExecution = backOffStrategy.start();
     initializeNextBackOff();
   }
@@ -52,11 +46,10 @@ class BackOffState
     if (remaining.isNegative())
     {
       log.info(
-        "{} - {}. retry for offset={} in {}, lateness: {}",
-        id,
+        "{} - {}. retry for offset={}, lateness: {}",
+        logPrefix,
         numRetries,
         offset,
-        topicPartition,
         remaining.abs());
       initializeNextBackOff();
       return false;
@@ -64,10 +57,9 @@ class BackOffState
     else
     {
       log.info(
-        "{} - Next retry for offset={} in {} is due in {}",
-        id,
+        "{} - Next retry for offset={} is due in {}",
+        logPrefix,
         offset,
-        topicPartition,
         remaining);
       return true;
     }
index f4ce531..f2ac444 100644 (file)
@@ -226,9 +226,8 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
   {
     partitions.forEach(topicPartition ->
       backOffState[topicPartition.partition()] = new BackOffState(
-        id,
+        id + " - partition=" + topicPartition.partition(),
         clock,
-        topicPartition,
         backOffStrategy));
   }
 
index 3ab25f4..f58f545 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -20,7 +19,6 @@ import static org.mockito.BDDMockito.given;
 class BackOffStateTest
 {
   final static String ID = "TEST";
-  final static TopicPartition TOPIC_PARTITION = new TopicPartition("test", 0);
   final static long OFFSET = 666;
   final static Instant NOW = Instant.now();
   final static long BACK_OFF = 1000l;
@@ -36,7 +34,7 @@ class BackOffStateTest
     // GIVEN
 
     // WHEN
-    BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+    BackOffState backOffState = new BackOffState(ID, clock, backOff);
 
     return backOffState;
   }
@@ -75,7 +73,7 @@ class BackOffStateTest
     given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
 
     // WHEN
-    BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+    BackOffState backOffState = new BackOffState(ID, clock, backOff);
     backOffState.start(OFFSET);
 
     return backOffState;
@@ -129,7 +127,7 @@ class BackOffStateTest
     given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
 
     // WHEN
-    BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+    BackOffState backOffState = new BackOffState(ID, clock, backOff);
     backOffState.start(OFFSET);
 
     return backOffState;