import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
-import org.springframework.util.Assert;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
private Exception exception;
private boolean ack = true;
+ @Override
+ public boolean remainingRecords()
+ {
+ return true;
+ }
@Override
public void handleOtherException(
MessageListenerContainer container,
boolean batchListener)
{
- Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
+ rememberExceptionAndStopContainer(thrownException, container);
+ }
+
+ @Override
+ public void handleRemaining(
+ Exception thrownException,
+ List<ConsumerRecord<?, ?>> records,
+ Consumer<?, ?> consumer,
+ MessageListenerContainer container)
+ {
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ records.forEach(record ->
+ offsets.computeIfAbsent(
+ new TopicPartition(record.topic(), record.partition()),
+ offset -> record.offset()));
+ offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
rememberExceptionAndStopContainer(thrownException, container);
}