From 46b56e2268ba36751d9ae3d2d87da5d0b2c917c5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 6 Apr 2025 13:12:08 +0200 Subject: [PATCH] =?utf8?q?GR=C3=9CN:=20Verhalten=20bei=20noch=20nicht=20ge?= =?utf8?q?schriebenem=20Offset=20angepasst?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 1 + .../de/juplo/kafka/DeadLetterConsumer.java | 28 ++++++++++++++++++- .../de/juplo/kafka/DeadLetterController.java | 12 ++++++-- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/README.sh b/README.sh index 98dcff5..9cec8ef 100755 --- a/README.sh +++ b/README.sh @@ -30,6 +30,7 @@ docker compose -f docker/docker-compose.yml up -d producer consumer nodlt while ! [[ $(http 0:8881/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for nodlt..."; sleep 1; done http -v :8881/0/0 +http -v :8881/1/666 http -v :8881/1/3 docker compose -f docker/docker-compose.yml stop producer consumer diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 8cd553e..4d94569 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Mono; @@ -128,6 +129,24 @@ public class DeadLetterConsumer implements Runnable consumer.resume(partitionsToResume); } + catch (OffsetOutOfRangeException e) + { + List partitionsToPause = new LinkedList<>(); + + e.offsetOutOfRangePartitions().forEach((topicPartition, offset) -> + { + FetchRequest failedFetchRequest = currentFetchRequest[topicPartition.partition()]; + log.error("{} - {} does not yet exist: {}", id, failedFetchRequest, e.toString()); + failedFetchRequest.future().completeExceptionally(e); + Optional fetchRequestMono = schedulePendingFetchRequest(topicPartition.partition(), true); + if (fetchRequestMono.isEmpty()) + { + partitionsToPause.add(topicPartition); + } + }); + + consumer.pause(partitionsToPause); + } } } catch(Exception e) @@ -145,9 +164,15 @@ public class DeadLetterConsumer implements Runnable } } + private Optional schedulePendingFetchRequest(int partition) { - if (currentFetchRequest[partition] == null) + return schedulePendingFetchRequest(partition, false); + } + + private Optional schedulePendingFetchRequest(int partition, boolean force) + { + if (force || currentFetchRequest[partition] == null) { FetchRequest nextFetchRequest = pendingFetchRequests[partition].poll(); if (nextFetchRequest != null) @@ -158,6 +183,7 @@ public class DeadLetterConsumer implements Runnable else { log.trace("{} - no pending fetch-request for partition {}.", id, partition); + currentFetchRequest[partition] = null; } } else diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index e8da81c..568c610 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -1,9 +1,9 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Mono; @@ -21,4 +21,10 @@ public class DeadLetterController { return deadLetterConsumer.requestRecord(partition, offset); } + + @ResponseStatus(value= HttpStatus.NOT_FOUND) + @ExceptionHandler(OffsetOutOfRangeException.class) + public void notFound(OffsetOutOfRangeException e) + { + } } -- 2.20.1