package de.juplo.kafka;
+import de.juplo.kafka.exceptions.NonExistentPartitionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
}
Mono<String> requestRecord(int partition, long offset)
{
+ if (partition >= numPartitions || partition < 0)
+ {
+ throw new NonExistentPartitionException(topic, partition);
+ }
+
CompletableFuture<String> future = new CompletableFuture<>();
FetchRequest fetchRequest = new FetchRequest(
package de.juplo.kafka;
+import de.juplo.kafka.exceptions.NonExistentPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
public void notFound(OffsetOutOfRangeException e)
{
}
+
+ @ResponseStatus(value= HttpStatus.BAD_REQUEST)
+ @ExceptionHandler(NonExistentPartitionException.class)
+ public void badRequest(NonExistentPartitionException e)
+ {
+ }
}
--- /dev/null
+package de.juplo.kafka.exceptions;
+
+import lombok.Getter;
+import org.apache.kafka.common.TopicPartition;
+
+
+@Getter
+public class NonExistentPartitionException extends RuntimeException
+{
+ private final TopicPartition partition;
+
+
+ public NonExistentPartitionException(String topic, int partition)
+ {
+ this(new TopicPartition(topic, partition));
+ }
+
+ public NonExistentPartitionException(TopicPartition partition)
+ {
+ super("Non-existent partition: " + partition);
+ this.partition = partition;
+ }
+}
void testNonExistentPartition(int partition)
{
ResponseEntity<String> response = restTemplate.getForEntity("/{partition}/0", String.class, partition);
- assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value()));
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.BAD_REQUEST.value()));
}