From 96ede1d44ef4fb9c1bcc6696c90e5ccc203de11b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 10 Jan 2025 10:39:54 +0100 Subject: [PATCH] Innere Klasse `BackOffState` extrahiert --- .../java/de/juplo/kafka/BackOffState.java | 121 ++++++++++++++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 113 ---------------- 2 files changed, 121 insertions(+), 113 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/BackOffState.java diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java new file mode 100644 index 00000000..92d08df0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -0,0 +1,121 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; +import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; + + +@Slf4j +class BackOffState +{ + private final String id; + private final Clock clock; + private final TopicPartition topicPartition; + @Getter + private final long offset; + private final Instant startTime; + + private BackOffExecution backOffExecution; + private int numRetries = 0; + private Instant timeNextRetryIsDue; + + + BackOffState() + { + id = "NONE"; + clock = null; + topicPartition = null; + offset = -1; + startTime = null; + } + + BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset) + { + this.id = id; + this.clock = clock; + this.topicPartition = topicPartition; + this.offset = offset; + this.startTime = clock.instant(); + + log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime); + backOffExecution = backOffStrategy.start(); + initializeNextBackOff(); + } + + boolean isWaitingForNextRetry() + { + if (backOffExecution == null) + { + return false; + } + + if (clock.instant().isAfter(timeNextRetryIsDue)) + { + numRetries++; + initializeNextBackOff(); + log.info("{} - Retrying {}", id, topicPartition); + return false; + } + else + { + log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue); + return true; + } + } + + boolean isRetryInProgress(long offset) + { + return this.offset == offset && timeNextRetryIsDue != null; + } + + boolean isUnsuccessful() + { + return backOffExecution == null && timeNextRetryIsDue != null; + } + + int getNumRetries() + { + return numRetries; + } + + Duration getDurationSpendRetrying() + { + return Duration.between(startTime, timeNextRetryIsDue); + } + + void markRetryAsSuccessful() + { + if (backOffExecution != null) + { + log.info( + "{} - retry #{} for {} succeeded after {}", + id, + numRetries, + topicPartition, + Duration.between(startTime, clock.instant())); + + backOffExecution = null; + timeNextRetryIsDue = null; + } + } + + private void initializeNextBackOff() + { + long backOffMillis = backOffExecution.nextBackOff(); + + if (backOffMillis == BackOffExecution.STOP) + { + backOffExecution = null; + } + else + { + timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 032ff660..2ef4b0ad 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,7 +1,5 @@ package de.juplo.kafka; -import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -11,7 +9,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import org.springframework.util.backoff.BackOff; -import org.springframework.util.backoff.BackOffExecution; import java.time.Clock; import java.time.Duration; @@ -250,114 +247,4 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable consumer.wakeup(); workerThread.join(); } - - - @Slf4j - static class BackOffState - { - private final String id; - private final Clock clock; - private final TopicPartition topicPartition; - @Getter - private final long offset; - private final Instant startTime; - - private BackOffExecution backOffExecution; - private int numRetries = 0; - private Instant timeNextRetryIsDue; - - - BackOffState() - { - id = "NONE"; - clock = null; - topicPartition = null; - offset = -1; - startTime = null; - } - - BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset) - { - this.id = id; - this.clock = clock; - this.topicPartition = topicPartition; - this.offset = offset; - this.startTime = clock.instant(); - - log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime); - backOffExecution = backOffStrategy.start(); - initializeNextBackOff(); - } - - boolean isWaitingForNextRetry() - { - if (backOffExecution == null) - { - return false; - } - - if (clock.instant().isAfter(timeNextRetryIsDue)) - { - numRetries++; - initializeNextBackOff(); - log.info("{} - Retrying {}", id, topicPartition); - return false; - } - else - { - log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue); - return true; - } - } - - boolean isRetryInProgress(long offset) - { - return this.offset == offset && timeNextRetryIsDue != null; - } - - boolean isUnsuccessful() - { - return backOffExecution == null && timeNextRetryIsDue != null; - } - - int getNumRetries() - { - return numRetries; - } - - Duration getDurationSpendRetrying() - { - return Duration.between(startTime, timeNextRetryIsDue); - } - - void markRetryAsSuccessful() - { - if (backOffExecution != null) - { - log.info( - "{} - retry #{} for {} succeeded after {}", - id, - numRetries, - topicPartition, - Duration.between(startTime, clock.instant())); - - backOffExecution = null; - timeNextRetryIsDue = null; - } - } - - private void initializeNextBackOff() - { - long backOffMillis = backOffExecution.nextBackOff(); - - if (backOffMillis == BackOffExecution.STOP) - { - backOffExecution = null; - } - else - { - timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis); - } - } - } } -- 2.20.1