From ffa654fe59c70f448c7af0079294139a2a7f8b4e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Sep 2022 09:57:22 +0200 Subject: [PATCH] WIP:errors --- .../juplo/kafka/ApplicationConfiguration.java | 6 ++ .../juplo/kafka/ApplicationErrorHandler.java | 70 +++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 src/main/java/de/juplo/kafka/ApplicationErrorHandler.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index f8bf857..c6798f1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -66,4 +66,10 @@ public class ApplicationConfiguration errorHandler, recordHandler); } + + @Bean + public ApplicationErrorHandler applicationErrorHandler() + { + return new ApplicationErrorHandler(); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java new file mode 100644 index 0000000..6e15717 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@ -0,0 +1,70 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.util.Assert; + +import java.util.Optional; + + +@Slf4j +public class ApplicationErrorHandler implements CommonErrorHandler +{ + private Exception exception; + private boolean ack = true; + + + @Override + public void handleOtherException( + Exception thrownException, + Consumer consumer, + MessageListenerContainer container, + boolean batchListener) + { + Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners"); + rememberExceptionAndStopContainer(thrownException, container); + } + + @Override + public void handleBatch( + Exception thrownException, + ConsumerRecords data, + Consumer consumer, + MessageListenerContainer container, + Runnable invokeListener) + { + // Do not commit the polled offsets on a logic-error + ack = false; + rememberExceptionAndStopContainer(thrownException, container); + } + + private void rememberExceptionAndStopContainer( + Exception exception, + MessageListenerContainer container) + { + log.error("{}, stopping container {} abnormally", exception, container); + this.exception = exception; + container.stopAbnormally(() -> log.info("{} is stopped", container)); + } + + @Override + public boolean isAckAfterHandle() + { + return ack; + } + + + public Optional getException() + { + return Optional.ofNullable(exception); + } + + public void clearState() + { + this.exception = null; + this.ack = true; + } +} -- 2.20.1