GRÜN: Verhalten bei noch nicht geschriebenem Offset angepasst
authorKai Moritz <kai@juplo.de>
Sun, 6 Apr 2025 11:12:08 +0000 (13:12 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
README.sh
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/main/java/de/juplo/kafka/DeadLetterController.java

index 98dcff5..9cec8ef 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -30,6 +30,7 @@ docker compose -f docker/docker-compose.yml up -d producer consumer nodlt
 while ! [[ $(http 0:8881/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for nodlt..."; sleep 1; done
 
 http -v :8881/0/0
+http -v :8881/1/666
 http -v :8881/1/3
 
 docker compose -f docker/docker-compose.yml stop producer consumer
index 8cd553e..4d94569 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Mono;
@@ -128,6 +129,24 @@ public class DeadLetterConsumer implements Runnable
 
           consumer.resume(partitionsToResume);
         }
+        catch (OffsetOutOfRangeException e)
+        {
+          List<TopicPartition> partitionsToPause = new LinkedList<>();
+
+          e.offsetOutOfRangePartitions().forEach((topicPartition, offset) ->
+          {
+            FetchRequest failedFetchRequest = currentFetchRequest[topicPartition.partition()];
+            log.error("{} - {} does not yet exist: {}", id, failedFetchRequest, e.toString());
+            failedFetchRequest.future().completeExceptionally(e);
+            Optional<FetchRequest> fetchRequestMono = schedulePendingFetchRequest(topicPartition.partition(), true);
+            if (fetchRequestMono.isEmpty())
+            {
+              partitionsToPause.add(topicPartition);
+            }
+          });
+
+          consumer.pause(partitionsToPause);
+        }
       }
     }
     catch(Exception e)
@@ -145,9 +164,15 @@ public class DeadLetterConsumer implements Runnable
     }
   }
 
+
   private Optional<FetchRequest> schedulePendingFetchRequest(int partition)
   {
-    if (currentFetchRequest[partition] == null)
+    return schedulePendingFetchRequest(partition, false);
+  }
+
+  private Optional<FetchRequest> schedulePendingFetchRequest(int partition, boolean force)
+  {
+    if (force || currentFetchRequest[partition] == null)
     {
       FetchRequest nextFetchRequest = pendingFetchRequests[partition].poll();
       if (nextFetchRequest != null)
@@ -158,6 +183,7 @@ public class DeadLetterConsumer implements Runnable
       else
       {
         log.trace("{} - no pending fetch-request for partition {}.", id, partition);
+        currentFetchRequest[partition] = null;
       }
     }
     else
index e8da81c..568c610 100644 (file)
@@ -1,9 +1,9 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Mono;
 
 
@@ -21,4 +21,10 @@ public class DeadLetterController
   {
     return deadLetterConsumer.requestRecord(partition, offset);
   }
+
+  @ResponseStatus(value= HttpStatus.NOT_FOUND)
+  @ExceptionHandler(OffsetOutOfRangeException.class)
+  public void notFound(OffsetOutOfRangeException e)
+  {
+  }
 }