From: Kai Moritz Date: Mon, 7 Apr 2025 17:48:24 +0000 (+0200) Subject: GRÜN: Verhalten beim Abrufen von einer nicht existierenden Partition korrigiert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=938d3c561da3596aa087afdfc0a31dc559866919;p=demos%2Fkafka%2Ftraining GRÜN: Verhalten beim Abrufen von einer nicht existierenden Partition korrigiert --- diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 4d94569..26a1e81 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import de.juplo.kafka.exceptions.NonExistentPartitionException; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -203,6 +204,11 @@ public class DeadLetterConsumer implements Runnable } Mono requestRecord(int partition, long offset) { + if (partition >= numPartitions || partition < 0) + { + throw new NonExistentPartitionException(topic, partition); + } + CompletableFuture future = new CompletableFuture<>(); FetchRequest fetchRequest = new FetchRequest( diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index 568c610..ba59c30 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import de.juplo.kafka.exceptions.NonExistentPartitionException; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; @@ -27,4 +28,10 @@ public class DeadLetterController public void notFound(OffsetOutOfRangeException e) { } + + @ResponseStatus(value= HttpStatus.BAD_REQUEST) + @ExceptionHandler(NonExistentPartitionException.class) + public void badRequest(NonExistentPartitionException e) + { + } } diff --git a/src/main/java/de/juplo/kafka/exceptions/NonExistentPartitionException.java b/src/main/java/de/juplo/kafka/exceptions/NonExistentPartitionException.java new file mode 100644 index 0000000..4e98a40 --- /dev/null +++ b/src/main/java/de/juplo/kafka/exceptions/NonExistentPartitionException.java @@ -0,0 +1,23 @@ +package de.juplo.kafka.exceptions; + +import lombok.Getter; +import org.apache.kafka.common.TopicPartition; + + +@Getter +public class NonExistentPartitionException extends RuntimeException +{ + private final TopicPartition partition; + + + public NonExistentPartitionException(String topic, int partition) + { + this(new TopicPartition(topic, partition)); + } + + public NonExistentPartitionException(TopicPartition partition) + { + super("Non-existent partition: " + partition); + this.partition = partition; + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 942bf69..600b6bf 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -82,7 +82,7 @@ public class ApplicationTests void testNonExistentPartition(int partition) { ResponseEntity response = restTemplate.getForEntity("/{partition}/0", String.class, partition); - assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value())); + assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.BAD_REQUEST.value())); }