Innere Klasse `BackOffState` extrahiert
authorKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:39:54 +0000 (10:39 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:39:54 +0000 (10:39 +0100)
src/main/java/de/juplo/kafka/BackOffState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java

diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java
new file mode 100644 (file)
index 0000000..92d08df
--- /dev/null
@@ -0,0 +1,121 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+
+@Slf4j
+class BackOffState
+{
+  private final String id;
+  private final Clock clock;
+  private final TopicPartition topicPartition;
+  @Getter
+  private final long offset;
+  private final Instant startTime;
+
+  private BackOffExecution backOffExecution;
+  private int numRetries = 0;
+  private Instant timeNextRetryIsDue;
+
+
+  BackOffState()
+  {
+    id = "NONE";
+    clock = null;
+    topicPartition = null;
+    offset = -1;
+    startTime = null;
+  }
+
+  BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
+  {
+    this.id = id;
+    this.clock = clock;
+    this.topicPartition = topicPartition;
+    this.offset = offset;
+    this.startTime = clock.instant();
+
+    log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+    backOffExecution = backOffStrategy.start();
+    initializeNextBackOff();
+  }
+
+  boolean isWaitingForNextRetry()
+  {
+    if (backOffExecution == null)
+    {
+      return false;
+    }
+
+    if (clock.instant().isAfter(timeNextRetryIsDue))
+    {
+      numRetries++;
+      initializeNextBackOff();
+      log.info("{} - Retrying {}", id, topicPartition);
+      return false;
+    }
+    else
+    {
+      log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue);
+      return true;
+    }
+  }
+
+  boolean isRetryInProgress(long offset)
+  {
+    return this.offset == offset && timeNextRetryIsDue != null;
+  }
+
+  boolean isUnsuccessful()
+  {
+    return backOffExecution == null && timeNextRetryIsDue != null;
+  }
+
+  int getNumRetries()
+  {
+    return numRetries;
+  }
+
+  Duration getDurationSpendRetrying()
+  {
+    return Duration.between(startTime, timeNextRetryIsDue);
+  }
+
+  void markRetryAsSuccessful()
+  {
+    if (backOffExecution != null)
+    {
+      log.info(
+        "{} - retry #{} for {} succeeded after {}",
+        id,
+        numRetries,
+        topicPartition,
+        Duration.between(startTime, clock.instant()));
+
+      backOffExecution = null;
+      timeNextRetryIsDue = null;
+    }
+  }
+
+  private void initializeNextBackOff()
+  {
+    long backOffMillis = backOffExecution.nextBackOff();
+
+    if (backOffMillis == BackOffExecution.STOP)
+    {
+      backOffExecution = null;
+    }
+    else
+    {
+      timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+    }
+  }
+}
index 032ff66..2ef4b0a 100644 (file)
@@ -1,7 +1,5 @@
 package de.juplo.kafka;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -11,7 +9,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.springframework.util.backoff.BackOff;
-import org.springframework.util.backoff.BackOffExecution;
 
 import java.time.Clock;
 import java.time.Duration;
@@ -250,114 +247,4 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
     consumer.wakeup();
     workerThread.join();
   }
-
-
-  @Slf4j
-  static class BackOffState
-  {
-    private final String id;
-    private final Clock clock;
-    private final TopicPartition topicPartition;
-    @Getter
-    private final long offset;
-    private final Instant startTime;
-
-    private BackOffExecution backOffExecution;
-    private int numRetries = 0;
-    private Instant timeNextRetryIsDue;
-
-
-    BackOffState()
-    {
-      id = "NONE";
-      clock = null;
-      topicPartition = null;
-      offset = -1;
-      startTime = null;
-    }
-
-    BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
-    {
-      this.id = id;
-      this.clock = clock;
-      this.topicPartition = topicPartition;
-      this.offset = offset;
-      this.startTime = clock.instant();
-
-      log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
-      backOffExecution = backOffStrategy.start();
-      initializeNextBackOff();
-    }
-
-    boolean isWaitingForNextRetry()
-    {
-      if (backOffExecution == null)
-      {
-        return false;
-      }
-
-      if (clock.instant().isAfter(timeNextRetryIsDue))
-      {
-        numRetries++;
-        initializeNextBackOff();
-        log.info("{} - Retrying {}", id, topicPartition);
-        return false;
-      }
-      else
-      {
-        log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue);
-        return true;
-      }
-    }
-
-    boolean isRetryInProgress(long offset)
-    {
-      return this.offset == offset && timeNextRetryIsDue != null;
-    }
-
-    boolean isUnsuccessful()
-    {
-      return backOffExecution == null && timeNextRetryIsDue != null;
-    }
-
-    int getNumRetries()
-    {
-      return numRetries;
-    }
-
-    Duration getDurationSpendRetrying()
-    {
-      return Duration.between(startTime, timeNextRetryIsDue);
-    }
-
-    void markRetryAsSuccessful()
-    {
-      if (backOffExecution != null)
-      {
-        log.info(
-          "{} - retry #{} for {} succeeded after {}",
-          id,
-          numRetries,
-          topicPartition,
-          Duration.between(startTime, clock.instant()));
-
-        backOffExecution = null;
-        timeNextRetryIsDue = null;
-      }
-    }
-
-    private void initializeNextBackOff()
-    {
-      long backOffMillis = backOffExecution.nextBackOff();
-
-      if (backOffMillis == BackOffExecution.STOP)
-      {
-        backOffExecution = null;
-      }
-      else
-      {
-        timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
-      }
-    }
-  }
 }