WIP:errors
authorKai Moritz <kai@juplo.de>
Fri, 9 Sep 2022 07:57:22 +0000 (09:57 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 07:49:04 +0000 (09:49 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationErrorHandler.java [new file with mode: 0644]

index f8bf857..c6798f1 100644 (file)
@@ -66,4 +66,10 @@ public class ApplicationConfiguration
             errorHandler,
             recordHandler);
   }
+
+  @Bean
+  public ApplicationErrorHandler applicationErrorHandler()
+  {
+    return new ApplicationErrorHandler();
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java
new file mode 100644 (file)
index 0000000..6e15717
--- /dev/null
@@ -0,0 +1,70 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.springframework.kafka.listener.CommonErrorHandler;
+import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.util.Assert;
+
+import java.util.Optional;
+
+
+@Slf4j
+public class ApplicationErrorHandler implements CommonErrorHandler
+{
+  private Exception exception;
+  private boolean ack = true;
+
+
+  @Override
+  public void handleOtherException(
+    Exception thrownException,
+    Consumer<?, ?> consumer,
+    MessageListenerContainer container,
+    boolean batchListener)
+  {
+    Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
+    rememberExceptionAndStopContainer(thrownException, container);
+  }
+
+  @Override
+  public void handleBatch(
+    Exception thrownException,
+    ConsumerRecords<?, ?> data,
+    Consumer<?, ?> consumer,
+    MessageListenerContainer container,
+    Runnable invokeListener)
+  {
+    // Do not commit the polled offsets on a logic-error
+    ack = false;
+    rememberExceptionAndStopContainer(thrownException, container);
+  }
+
+  private void rememberExceptionAndStopContainer(
+      Exception exception,
+      MessageListenerContainer container)
+  {
+    log.error("{}, stopping container {} abnormally", exception, container);
+    this.exception = exception;
+    container.stopAbnormally(() -> log.info("{} is stopped", container));
+  }
+
+  @Override
+  public boolean isAckAfterHandle()
+  {
+    return ack;
+  }
+
+
+  public Optional<Exception> getException()
+  {
+    return Optional.ofNullable(exception);
+  }
+
+  public void clearState()
+  {
+    this.exception = null;
+    this.ack = true;
+  }
+}