X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationErrorHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationErrorHandler.java;h=6e157173272bb629fc317c1aceb3033cbb2fedc7;hb=f095f71a104fcde025a63f87ba75eb5cb3136656;hp=0000000000000000000000000000000000000000;hpb=2eb3c45c9438a20777b0110defa593dd45c64511;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java new file mode 100644 index 0000000..6e15717 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@ -0,0 +1,70 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.util.Assert; + +import java.util.Optional; + + +@Slf4j +public class ApplicationErrorHandler implements CommonErrorHandler +{ + private Exception exception; + private boolean ack = true; + + + @Override + public void handleOtherException( + Exception thrownException, + Consumer consumer, + MessageListenerContainer container, + boolean batchListener) + { + Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners"); + rememberExceptionAndStopContainer(thrownException, container); + } + + @Override + public void handleBatch( + Exception thrownException, + ConsumerRecords data, + Consumer consumer, + MessageListenerContainer container, + Runnable invokeListener) + { + // Do not commit the polled offsets on a logic-error + ack = false; + rememberExceptionAndStopContainer(thrownException, container); + } + + private void rememberExceptionAndStopContainer( + Exception exception, + MessageListenerContainer container) + { + log.error("{}, stopping container {} abnormally", exception, container); + this.exception = exception; + container.stopAbnormally(() -> log.info("{} is stopped", container)); + } + + @Override + public boolean isAckAfterHandle() + { + return ack; + } + + + public Optional getException() + { + return Optional.ofNullable(exception); + } + + public void clearState() + { + this.exception = null; + this.ack = true; + } +}