`BackOffState` wird nur 1x erzeugt und danach zurückgesetzt und gestartet
authorKai Moritz <kai@juplo.de>
Sat, 11 Jan 2025 10:21:42 +0000 (11:21 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 6 Feb 2025 17:04:39 +0000 (18:04 +0100)
src/main/java/de/juplo/kafka/BackOffState.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/BackOffStateTest.java

index 5b41a1c..d653dcd 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.TopicPartition;
 import org.springframework.util.backoff.BackOff;
@@ -11,35 +12,25 @@ import java.time.Duration;
 import java.time.Instant;
 
 
+@RequiredArgsConstructor
 @Slf4j
 class BackOffState
 {
   private final String id;
   private final Clock clock;
   private final TopicPartition topicPartition;
-  @Getter
-  private final long offset;
+  private final BackOff backOffStrategy;
 
+  @Getter
+  private long offset;
   private BackOffExecution backOffExecution;
   private int numRetries = 0;
   private Instant timeNextRetryIsDue;
 
 
-  BackOffState()
-  {
-    id = "NONE";
-    clock = null;
-    topicPartition = null;
-    offset = -1;
-  }
-
-  BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
+  void start(long offset)
   {
-    this.id = id;
-    this.clock = clock;
-    this.topicPartition = topicPartition;
     this.offset = offset;
-
     log.info(
       "{} - Back-Off requested for offset={} in {}",
       id,
@@ -95,6 +86,7 @@ class BackOffState
   void reset()
   {
     timeNextRetryIsDue = null;
+    offset = -1l;
   }
 
   private void initializeNextBackOff()
index b767ca8..f4ce531 100644 (file)
@@ -149,7 +149,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 if (!backOffState[partition].isStarted(record.offset()))
                 {
                   log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
-                  backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset());
+                  backOffState[partition].start(record.offset());
                   consumer.seek(topicPartition, record.offset());
                   break;
                 }
@@ -224,12 +224,18 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    partitions.forEach(tp -> backOffState[tp.partition()] = new BackOffState());
+    partitions.forEach(topicPartition ->
+      backOffState[topicPartition.partition()] = new BackOffState(
+        id,
+        clock,
+        topicPartition,
+        backOffStrategy));
   }
 
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
+    partitions.forEach(tp -> backOffState[tp.partition()] = null);
   }
 
 
index e77d23f..3ab25f4 100644 (file)
@@ -33,7 +33,12 @@ class BackOffStateTest
 
   private BackOffState NotStartedBackOffState()
   {
-    return new BackOffState();
+    // GIVEN
+
+    // WHEN
+    BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+
+    return backOffState;
   }
 
   @Test
@@ -69,7 +74,11 @@ class BackOffStateTest
     given(backOff.start()).willReturn(backOffExecution);
     given(backOffExecution.nextBackOff()).willReturn(BackOffExecution.STOP);
 
-    return new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+    // WHEN
+    BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+    backOffState.start(OFFSET);
+
+    return backOffState;
   }
 
   @Test
@@ -114,11 +123,16 @@ class BackOffStateTest
 
   private BackOffState StartedBackoffStateWithRetries()
   {
+    // GIVEN
     given(clock.instant()).willReturn(NOW);
     given(backOff.start()).willReturn(backOffExecution);
     given(backOffExecution.nextBackOff()).willReturn(BACK_OFF);
 
-    return new BackOffState(ID, backOff, clock, TOPIC_PARTITION, OFFSET);
+    // WHEN
+    BackOffState backOffState = new BackOffState(ID, clock, TOPIC_PARTITION, backOff);
+    backOffState.start(OFFSET);
+
+    return backOffState;
   }
 
   @Test
@@ -277,8 +291,8 @@ class BackOffStateTest
   }
 
   @Test
-  @DisplayName("A started BackOffState is started if it is marked as completed")
-  void StartedBackOffStateIsStartedIfMarkedCompleted()
+  @DisplayName("A started BackOffState is not started after a reset")
+  void StartedBackOffStateIsNotStartedAfterReset()
   {
     // GIVEN
     BackOffState backOffState = StartedBackoffStateWithRetries();
@@ -287,6 +301,6 @@ class BackOffStateTest
     backOffState.reset();
 
     // THEN
-    assertThat(backOffState.isStarted(OFFSET)).isTrue();
+    assertThat(backOffState.isStarted(OFFSET)).isFalse();
   }
 }