1 package de.juplo.kafka;
3 import org.apache.kafka.clients.consumer.Consumer;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
7 import org.springframework.kafka.listener.MessageListenerContainer;
10 import java.util.Optional;
13 public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
15 private Exception exception;
18 public synchronized Optional<Exception> getException()
20 return Optional.ofNullable(exception);
23 public synchronized void clearException()
25 this.exception = null;
30 public void handleOtherException(
31 Exception thrownException, Consumer<?, ?> consumer,
32 MessageListenerContainer container,
33 boolean batchListener)
35 this.exception = thrownException;
36 super.handleOtherException(thrownException, consumer, container, batchListener);
40 public void handleRemaining(
41 Exception thrownException,
42 List<ConsumerRecord<?, ?>> records,
43 Consumer<?, ?> consumer,
44 MessageListenerContainer container)
46 this.exception = thrownException;
47 super.handleRemaining(thrownException, records, consumer, container);
51 public void handleBatch(
52 Exception thrownException,
53 ConsumerRecords<?, ?> data,
54 Consumer<?, ?> consumer,
55 MessageListenerContainer container,
56 Runnable invokeListener)
58 this.exception = thrownException;
59 super.handleBatch(thrownException, data, consumer, container, invokeListener);