X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationErrorHandler.java;h=52c6a0c2fdc936bba94006456446e32380b13b54;hb=0c9a0c1cf9a0065012743efcd940d8721bc33c20;hp=6e157173272bb629fc317c1aceb3033cbb2fedc7;hpb=f095f71a104fcde025a63f87ba75eb5cb3136656;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java index 6e15717..52c6a0c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@ -2,11 +2,15 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.util.Assert; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; @@ -16,6 +20,11 @@ public class ApplicationErrorHandler implements CommonErrorHandler private Exception exception; private boolean ack = true; + @Override + public boolean remainingRecords() + { + return true; + } @Override public void handleOtherException( @@ -24,7 +33,22 @@ public class ApplicationErrorHandler implements CommonErrorHandler MessageListenerContainer container, boolean batchListener) { - Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners"); + rememberExceptionAndStopContainer(thrownException, container); + } + + @Override + public void handleRemaining( + Exception thrownException, + List> records, + Consumer consumer, + MessageListenerContainer container) + { + Map offsets = new HashMap<>(); + records.forEach(record -> + offsets.computeIfAbsent( + new TopicPartition(record.topic(), record.partition()), + offset -> record.offset())); + offsets.forEach((tp, offset) -> consumer.seek(tp, offset)); rememberExceptionAndStopContainer(thrownException, container); }