1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.springframework.kafka.listener.CommonErrorHandler;
7 import org.springframework.kafka.listener.MessageListenerContainer;
8 import org.springframework.util.Assert;
10 import java.util.Optional;
14 public class ApplicationErrorHandler implements CommonErrorHandler
16 private Exception exception;
17 private boolean ack = true;
21 public void handleOtherException(
22 Exception thrownException,
23 Consumer<?, ?> consumer,
24 MessageListenerContainer container,
25 boolean batchListener)
27 Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
28 rememberExceptionAndStopContainer(thrownException, container);
32 public void handleBatch(
33 Exception thrownException,
34 ConsumerRecords<?, ?> data,
35 Consumer<?, ?> consumer,
36 MessageListenerContainer container,
37 Runnable invokeListener)
39 // Do not commit the polled offsets on a logic-error
41 rememberExceptionAndStopContainer(thrownException, container);
44 private void rememberExceptionAndStopContainer(
46 MessageListenerContainer container)
48 log.error("{}, stopping container {} abnormally", exception, container);
49 this.exception = exception;
50 container.stopAbnormally(() -> log.info("{} is stopped", container));
54 public boolean isAckAfterHandle()
60 public Optional<Exception> getException()
62 return Optional.ofNullable(exception);
65 public void clearState()
67 this.exception = null;