Innere Klasse `BackOffState` statisch gemacht
authorKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:28:44 +0000 (10:28 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 09:38:38 +0000 (10:38 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index fe5403f..032ff66 100644 (file)
@@ -1,5 +1,7 @@
 package de.juplo.kafka;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -99,7 +101,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
             if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
             {
               log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
-              consumer.seek(topicPartition, backOffState[topicPartition.partition()].offset);
+              consumer.seek(topicPartition, backOffState[topicPartition.partition()].getOffset());
               continue;
             }
 
@@ -154,7 +156,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                     id,
                     record.offset(),
                     partition);
-                  backOffState[partition] = new BackOffState(topicPartition, record.offset());
+                  backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset());
                   consumer.seek(topicPartition, record.offset());
                   break;
                 }
@@ -250,9 +252,13 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
   }
 
 
-  private class BackOffState
+  @Slf4j
+  static class BackOffState
   {
+    private final String id;
+    private final Clock clock;
     private final TopicPartition topicPartition;
+    @Getter
     private final long offset;
     private final Instant startTime;
 
@@ -263,13 +269,17 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
 
     BackOffState()
     {
+      id = "NONE";
+      clock = null;
       topicPartition = null;
       offset = -1;
       startTime = null;
     }
 
-    BackOffState(TopicPartition topicPartition, long offset)
+    BackOffState(String id, BackOff backOffStrategy, Clock clock, TopicPartition topicPartition, long offset)
     {
+      this.id = id;
+      this.clock = clock;
       this.topicPartition = topicPartition;
       this.offset = offset;
       this.startTime = clock.instant();