GRÜN: Verhalten beim Abrufen von einer nicht existierenden Partition korrigiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 17:48:24 +0000 (19:48 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/main/java/de/juplo/kafka/DeadLetterController.java
src/main/java/de/juplo/kafka/exceptions/NonExistentPartitionException.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ApplicationTests.java

index 4d94569..26a1e81 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import de.juplo.kafka.exceptions.NonExistentPartitionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -203,6 +204,11 @@ public class DeadLetterConsumer implements Runnable
   }
   Mono<String> requestRecord(int partition, long offset)
   {
+    if (partition >= numPartitions || partition < 0)
+    {
+      throw new NonExistentPartitionException(topic, partition);
+    }
+
     CompletableFuture<String> future = new CompletableFuture<>();
 
     FetchRequest fetchRequest = new FetchRequest(
index 568c610..ba59c30 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import de.juplo.kafka.exceptions.NonExistentPartitionException;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
@@ -27,4 +28,10 @@ public class DeadLetterController
   public void notFound(OffsetOutOfRangeException e)
   {
   }
+
+  @ResponseStatus(value= HttpStatus.BAD_REQUEST)
+  @ExceptionHandler(NonExistentPartitionException.class)
+  public void badRequest(NonExistentPartitionException e)
+  {
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/exceptions/NonExistentPartitionException.java b/src/main/java/de/juplo/kafka/exceptions/NonExistentPartitionException.java
new file mode 100644 (file)
index 0000000..4e98a40
--- /dev/null
@@ -0,0 +1,23 @@
+package de.juplo.kafka.exceptions;
+
+import lombok.Getter;
+import org.apache.kafka.common.TopicPartition;
+
+
+@Getter
+public class NonExistentPartitionException extends RuntimeException
+{
+  private final TopicPartition partition;
+
+
+  public NonExistentPartitionException(String topic, int partition)
+  {
+    this(new TopicPartition(topic, partition));
+  }
+
+  public NonExistentPartitionException(TopicPartition partition)
+  {
+    super("Non-existent partition: " + partition);
+    this.partition = partition;
+  }
+}
index 942bf69..600b6bf 100644 (file)
@@ -82,7 +82,7 @@ public class ApplicationTests
   void testNonExistentPartition(int partition)
   {
     ResponseEntity<String> response = restTemplate.getForEntity("/{partition}/0", String.class, partition);
-    assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value()));
+    assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.BAD_REQUEST.value()));
   }