import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Mono;
consumer.resume(partitionsToResume);
}
+ catch (OffsetOutOfRangeException e)
+ {
+ List<TopicPartition> partitionsToPause = new LinkedList<>();
+
+ e.offsetOutOfRangePartitions().forEach((topicPartition, offset) ->
+ {
+ FetchRequest failedFetchRequest = currentFetchRequest[topicPartition.partition()];
+ log.error("{} - {} does not yet exist: {}", id, failedFetchRequest, e.toString());
+ failedFetchRequest.future().completeExceptionally(e);
+ Optional<FetchRequest> fetchRequestMono = schedulePendingFetchRequest(topicPartition.partition(), true);
+ if (fetchRequestMono.isEmpty())
+ {
+ partitionsToPause.add(topicPartition);
+ }
+ });
+
+ consumer.pause(partitionsToPause);
+ }
}
}
catch(Exception e)
}
}
+
private Optional<FetchRequest> schedulePendingFetchRequest(int partition)
{
- if (currentFetchRequest[partition] == null)
+ return schedulePendingFetchRequest(partition, false);
+ }
+
+ private Optional<FetchRequest> schedulePendingFetchRequest(int partition, boolean force)
+ {
+ if (force || currentFetchRequest[partition] == null)
{
FetchRequest nextFetchRequest = pendingFetchRequests[partition].poll();
if (nextFetchRequest != null)
else
{
log.trace("{} - no pending fetch-request for partition {}.", id, partition);
+ currentFetchRequest[partition] = null;
}
}
else
package de.juplo.kafka;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
{
return deadLetterConsumer.requestRecord(partition, offset);
}
+
+ @ResponseStatus(value= HttpStatus.NOT_FOUND)
+ @ExceptionHandler(OffsetOutOfRangeException.class)
+ public void notFound(OffsetOutOfRangeException e)
+ {
+ }
}