From 5b0f44a14b6ccc9cbc15d37f975c432580dab15b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 10 Jan 2025 10:28:44 +0100 Subject: [PATCH] Innere Klasse `BackOffState` statisch gemacht --- .../java/de/juplo/kafka/ExampleConsumer.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index fe5403f8..032ff660 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,5 +1,7 @@ package de.juplo.kafka; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -99,7 +101,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable if (backOffState[topicPartition.partition()].isWaitingForNextRetry()) { log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition); - consumer.seek(topicPartition, backOffState[topicPartition.partition()].offset); + consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset()); continue; } @@ -154,7 +156,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable id, record.offset(), partition); - backOffState[partition] = new BackOffState(topicPartition, record.offset()); + backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset()); consumer.seek(topicPartition, record.offset()); break; } @@ -250,9 +252,13 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable } - private class BackOffState + @Slf4j + static class BackOffState { + private final String id; + private final Clock clock; private final TopicPartition topicPartition; + @Getter private final long offset; private final Instant startTime; @@ -263,13 +269,17 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable BackOffState() { + id = "NONE"; + clock = null; topicPartition = null; offset = -1; startTime = null; } - BackOffState(TopicPartition topicPartition, long offset) + BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset) { + this.id = id; + this.clock = clock; this.topicPartition = topicPartition; this.offset = offset; this.startTime = clock.instant(); -- 2.20.1