GREEN: Erweitertes Error-Handling implementiert
authorKai Moritz <kai@juplo.de>
Sat, 14 Dec 2024 13:31:24 +0000 (14:31 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jan 2025 09:49:08 +0000 (10:49 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 16a81a1..38b848e 100644 (file)
@@ -2,25 +2,36 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.BackOffExecution;
 
 import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
 
 @Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
 {
   private final String id;
   private final String topic;
   private final Consumer<String, Long> consumer;
   private final RecordHandler<String, Long> recordHandler;
   private final Thread workerThread;
+  private final Clock clock;
+  private final Duration maxPollInterval;
+  private final Duration minTimeForNextRecord;
+  private final BackOff backOffStrategy;
+  private final BackOffState[] backOffState;
   private final Runnable closeCallback;
 
   private volatile boolean running = false;
@@ -42,6 +53,14 @@ public class ExampleConsumer implements Runnable
     this.topic = topic;
     this.consumer = consumer;
     this.recordHandler = recordHandler;
+    this.clock = clock;
+    this.maxPollInterval = maxPollInterval;
+    this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2);
+    this.backOffStrategy = backOffStrategy;
+
+    int numPartitions = consumer.partitionsFor(topic).size();
+    log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+    this.backOffState = new BackOffState[numPartitions];
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
@@ -56,7 +75,7 @@ public class ExampleConsumer implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), this);
       running = true;
 
       while (running)
@@ -67,14 +86,100 @@ public class ExampleConsumer implements Runnable
             consumer.poll(Duration.ofSeconds(1));
 
           log.info("{} - Received {} messages", id, records.count());
-          for (ConsumerRecord<String, Long> record : records)
+
+          Instant deadline = clock.instant().plus(maxPollInterval);
+          boolean abortCurrentPoll = false;
+
+          for (TopicPartition topicPartition : records.partitions())
           {
-            handleRecord(
-              record.topic(),
-              record.partition(),
-              record.offset(),
-              record.key(),
-              record.value());
+            if (backOffState[topicPartition.partition()].isWaitingForNextRetry())
+            {
+              log.info("{} - {} is blocked, because it is waiting for a retry", id, topicPartition);
+              consumer.seek(topicPartition, backOffState[topicPartition.partition()].offset);
+              continue;
+            }
+
+            List<ConsumerRecord<String, Long>> recordsForPartition = records.records(topicPartition);
+            log.debug(
+              "{} - Received {} messages for partition {}",
+              id,
+              recordsForPartition.size(),
+              topicPartition);
+
+            boolean partitionHasRetryableError = false;
+
+            for (ConsumerRecord<String, Long> record : recordsForPartition)
+            {
+              if (abortCurrentPoll)
+              {
+                consumer.seek(topicPartition, record.offset());
+                break;
+              }
+
+              Instant now = clock.instant();
+              Duration timeLeft = Duration.between(now, deadline);
+
+              if (timeLeft.minus(minTimeForNextRecord).isNegative())
+              {
+                log.info(
+                  "{} - Aborting record handling, because only {} are left until the poll-interval expires!",
+                  id,
+                  timeLeft);
+                abortCurrentPoll = true;
+                consumer.seek(topicPartition, record.offset());
+                break;
+              }
+
+              try
+              {
+                handleRecord(
+                  record.topic(),
+                  record.partition(),
+                  record.offset(),
+                  record.key(),
+                  record.value());
+              }
+              catch (RetriableErrorException e)
+              {
+                // Seeking to the offset of the record, that raised the exception, and
+                // leaving the loop afterwards, retries the record
+                int partition = topicPartition.partition();
+                if (!backOffState[partition].isRetryInProgress(record.offset()))
+                {
+                  backOffState[partition] = new BackOffState(topicPartition, record.offset());
+                  partitionHasRetryableError = true;
+                  consumer.seek(topicPartition, record.offset());
+                  break;
+                }
+                else
+                {
+                  if (backOffState[partition].isUnsuccessful())
+                  {
+                    log.warn(
+                      "{} - Ignoring retryable error after {} attempts and {}",
+                      id,
+                      backOffState[partition].getNumRetries(),
+                      backOffState[partition].getDurationSpendRetrying());
+                  }
+                  else
+                  {
+                    consumer.seek(topicPartition, record.offset());
+                    partitionHasRetryableError = true;
+                    break;
+                  }
+                }
+              }
+              catch (NonRetriableErrorException e)
+              {
+                // Just ignore, to skip
+                log.warn("{} - Ignoring non-retryable error!", id, e);
+              }
+
+              if (!partitionHasRetryableError)
+              {
+                backOffState[topicPartition.partition()].markRetryAsSuccessful();
+              }
+            }
           }
         }
         catch(RecordDeserializationException e)
@@ -116,6 +221,18 @@ public class ExampleConsumer implements Runnable
   }
 
 
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp -> backOffState[tp.partition()] = new BackOffState());
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+  }
+
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} joining the worker-thread...", id);
@@ -123,4 +240,106 @@ public class ExampleConsumer implements Runnable
     consumer.wakeup();
     workerThread.join();
   }
+
+
+  private class BackOffState
+  {
+    private final TopicPartition topicPartition;
+    private final long offset;
+    private final Instant startTime;
+
+    private BackOffExecution backOffExecution;
+    private int numRetries = 0;
+    private Instant timeNextRetryIsDue;
+
+
+    BackOffState()
+    {
+      topicPartition = null;
+      offset = 0;
+      startTime = null;
+    }
+
+    BackOffState(TopicPartition topicPartition, long offset)
+    {
+      this.topicPartition = topicPartition;
+      this.offset = offset;
+      this.startTime = clock.instant();
+
+      log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+      backOffExecution = backOffStrategy.start();
+      initializeNextBackOff();
+    }
+
+    boolean isWaitingForNextRetry()
+    {
+      if (backOffExecution == null)
+      {
+        return false;
+      }
+
+      if (clock.instant().isAfter(timeNextRetryIsDue))
+      {
+        numRetries++;
+        initializeNextBackOff();
+        log.info("{} - Retrying {}", id, topicPartition);
+        return false;
+      }
+      else
+      {
+        log.info("{} - Next retry for {} is due at {}", id, topicPartition, timeNextRetryIsDue);
+        return true;
+      }
+    }
+
+    boolean isRetryInProgress(long offset)
+    {
+      return this.offset == offset && timeNextRetryIsDue != null;
+    }
+
+    boolean isUnsuccessful()
+    {
+      return backOffExecution == null && timeNextRetryIsDue != null;
+    }
+
+    int getNumRetries()
+    {
+      return numRetries;
+    }
+
+    Duration getDurationSpendRetrying()
+    {
+      return Duration.between(startTime, timeNextRetryIsDue);
+    }
+
+    void markRetryAsSuccessful()
+    {
+      if (backOffExecution != null)
+      {
+        log.info(
+          "{} - retry #{} for {} succeeded after {}",
+          id,
+          numRetries,
+          topicPartition,
+          Duration.between(startTime, clock.instant()));
+
+        backOffExecution = null;
+        timeNextRetryIsDue = null;
+      }
+    }
+
+    private void initializeNextBackOff()
+    {
+      long backOffMillis = backOffExecution.nextBackOff();
+
+      if (backOffMillis == BackOffExecution.STOP)
+      {
+        backOffExecution = null;
+      }
+      else
+      {
+        timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
+      }
+    }
+  }
 }