--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.springframework.kafka.listener.CommonErrorHandler;
+import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.util.Assert;
+
+import java.util.Optional;
+
+
+@Slf4j
+public class ApplicationErrorHandler implements CommonErrorHandler
+{
+ private Exception exception;
+ private boolean ack = true;
+
+
+ @Override
+ public void handleOtherException(
+ Exception thrownException,
+ Consumer<?, ?> consumer,
+ MessageListenerContainer container,
+ boolean batchListener)
+ {
+ Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
+ rememberExceptionAndStopContainer(thrownException, container);
+ }
+
+ @Override
+ public void handleBatch(
+ Exception thrownException,
+ ConsumerRecords<?, ?> data,
+ Consumer<?, ?> consumer,
+ MessageListenerContainer container,
+ Runnable invokeListener)
+ {
+ // Do not commit the polled offsets on a logic-error
+ ack = false;
+ rememberExceptionAndStopContainer(thrownException, container);
+ }
+
+ private void rememberExceptionAndStopContainer(
+ Exception exception,
+ MessageListenerContainer container)
+ {
+ log.error("{}, stopping container {} abnormally", exception, container);
+ this.exception = exception;
+ container.stopAbnormally(() -> log.info("{} is stopped", container));
+ }
+
+ @Override
+ public boolean isAckAfterHandle()
+ {
+ return ack;
+ }
+
+
+ public Optional<Exception> getException()
+ {
+ return Optional.ofNullable(exception);
+ }
+
+ public void clearState()
+ {
+ this.exception = null;
+ this.ack = true;
+ }
+}