import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Mono;
 
           consumer.resume(partitionsToResume);
         }
+        catch (OffsetOutOfRangeException e)
+        {
+          List<TopicPartition> partitionsToPause = new LinkedList<>();
+
+          e.offsetOutOfRangePartitions().forEach((topicPartition, offset) ->
+          {
+            FetchRequest failedFetchRequest = currentFetchRequest[topicPartition.partition()];
+            log.error("{} - {} does not yet exist: {}", id, failedFetchRequest, e.toString());
+            failedFetchRequest.future().completeExceptionally(e);
+            Optional<FetchRequest> fetchRequestMono = schedulePendingFetchRequest(topicPartition.partition(), true);
+            if (fetchRequestMono.isEmpty())
+            {
+              partitionsToPause.add(topicPartition);
+            }
+          });
+
+          consumer.pause(partitionsToPause);
+        }
       }
     }
     catch(Exception e)
     }
   }
 
+
   private Optional<FetchRequest> schedulePendingFetchRequest(int partition)
   {
-    if (currentFetchRequest[partition] == null)
+    return schedulePendingFetchRequest(partition, false);
+  }
+
+  private Optional<FetchRequest> schedulePendingFetchRequest(int partition, boolean force)
+  {
+    if (force || currentFetchRequest[partition] == null)
     {
       FetchRequest nextFetchRequest = pendingFetchRequests[partition].poll();
       if (nextFetchRequest != null)
       else
       {
         log.trace("{} - no pending fetch-request for partition {}.", id, partition);
+        currentFetchRequest[partition] = null;
       }
     }
     else
 
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Mono;
 
 
   {
     return deadLetterConsumer.requestRecord(partition, offset);
   }
+
+  @ResponseStatus(value= HttpStatus.NOT_FOUND)
+  @ExceptionHandler(OffsetOutOfRangeException.class)
+  public void notFound(OffsetOutOfRangeException e)
+  {
+  }
 }