--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+
+@Slf4j
+class BackOffState
+{
+ private final String id;
+ private final Clock clock;
+ private final TopicPartition topicPartition;
+ @Getter
+ private final long offset;
+ private final Instant startTime;
+
+ private BackOffExecution backOffExecution;
+ private int numRetries = 0;
+ private Instant timeNextRetryIsDue;
+
+
+ BackOffState()
+ {
+ id = "NONE";
+ clock = null;
+ topicPartition = null;
+ offset = -1;
+ startTime = null;
+ }
+
+ BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
+ {
+ this.id = id;
+ this.clock = clock;
+ this.topicPartition = topicPartition;
+ this.offset = offset;
+ this.startTime = clock.instant();
+
+ log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+ backOffExecution = backOffStrategy.start();
+ initializeNextBackOff();
+ }
+
+ boolean isWaitingForNextRetry()
+ {
+ if (backOffExecution == null)
+ {
+ return false;
+ }
+
+ if (clock.instant().isAfter(timeNextRetryIsDue))
+ {
+ numRetries++;
+ initializeNextBackOff();
+ log.info("{} - Retrying {}", id, topicPartition);
+ return false;
+ }
+ else
+ {
+ log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue);
+ return true;
+ }
+ }
+
+ boolean isRetryInProgress(long offset)
+ {
+ return this.offset == offset && timeNextRetryIsDue != null;
+ }
+
+ boolean isUnsuccessful()
+ {
+ return backOffExecution == null && timeNextRetryIsDue != null;
+ }
+
+ int getNumRetries()
+ {
+ return numRetries;
+ }
+
+ Duration getDurationSpendRetrying()
+ {
+ return Duration.between(startTime, timeNextRetryIsDue);
+ }
+
+ void markRetryAsSuccessful()
+ {
+ if (backOffExecution != null)
+ {
+ log.info(
+ "{} - retry #{} for {} succeeded after {}",
+ id,
+ numRetries,
+ topicPartition,
+ Duration.between(startTime, clock.instant()));
+
+ backOffExecution = null;
+ timeNextRetryIsDue = null;
+ }
+ }
+
+ private void initializeNextBackOff()
+ {
+ long backOffMillis = backOffExecution.nextBackOff();
+
+ if (backOffMillis == BackOffExecution.STOP)
+ {
+ backOffExecution = null;
+ }
+ else
+ {
+ timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+ }
+ }
+}
package de.juplo.kafka;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.util.backoff.BackOff;
-import org.springframework.util.backoff.BackOffExecution;
import java.time.Clock;
import java.time.Duration;
consumer.wakeup();
workerThread.join();
}
-
-
- @Slf4j
- static class BackOffState
- {
- private final String id;
- private final Clock clock;
- private final TopicPartition topicPartition;
- @Getter
- private final long offset;
- private final Instant startTime;
-
- private BackOffExecution backOffExecution;
- private int numRetries = 0;
- private Instant timeNextRetryIsDue;
-
-
- BackOffState()
- {
- id = "NONE";
- clock = null;
- topicPartition = null;
- offset = -1;
- startTime = null;
- }
-
- BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
- {
- this.id = id;
- this.clock = clock;
- this.topicPartition = topicPartition;
- this.offset = offset;
- this.startTime = clock.instant();
-
- log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
- backOffExecution = backOffStrategy.start();
- initializeNextBackOff();
- }
-
- boolean isWaitingForNextRetry()
- {
- if (backOffExecution == null)
- {
- return false;
- }
-
- if (clock.instant().isAfter(timeNextRetryIsDue))
- {
- numRetries++;
- initializeNextBackOff();
- log.info("{} - Retrying {}", id, topicPartition);
- return false;
- }
- else
- {
- log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue);
- return true;
- }
- }
-
- boolean isRetryInProgress(long offset)
- {
- return this.offset == offset && timeNextRetryIsDue != null;
- }
-
- boolean isUnsuccessful()
- {
- return backOffExecution == null && timeNextRetryIsDue != null;
- }
-
- int getNumRetries()
- {
- return numRetries;
- }
-
- Duration getDurationSpendRetrying()
- {
- return Duration.between(startTime, timeNextRetryIsDue);
- }
-
- void markRetryAsSuccessful()
- {
- if (backOffExecution != null)
- {
- log.info(
- "{} - retry #{} for {} succeeded after {}",
- id,
- numRetries,
- topicPartition,
- Duration.between(startTime, clock.instant()));
-
- backOffExecution = null;
- timeNextRetryIsDue = null;
- }
- }
-
- private void initializeNextBackOff()
- {
- long backOffMillis = backOffExecution.nextBackOff();
-
- if (backOffMillis == BackOffExecution.STOP)
- {
- backOffExecution = null;
- }
- else
- {
- timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
- }
- }
- }
}