-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
-import org.springframework.kafka.listener.MessageListenerContainer;
-
-import java.util.List;
-import java.util.Optional;
-
-
-public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
-{
- private Exception exception;
-
-
- public synchronized Optional<Exception> getException()
- {
- return Optional.ofNullable(exception);
- }
-
- public synchronized void clearException()
- {
- this.exception = null;
- }
-
-
- @Override
- public void handleOtherException(
- Exception thrownException, Consumer<?, ?> consumer,
- MessageListenerContainer container,
- boolean batchListener)
- {
- this.exception = thrownException;
- super.handleOtherException(thrownException, consumer, container, batchListener);
- }
-
- @Override
- public void handleRemaining(
- Exception thrownException,
- List<ConsumerRecord<?, ?>> records,
- Consumer<?, ?> consumer,
- MessageListenerContainer container)
- {
- this.exception = thrownException;
- super.handleRemaining(thrownException, records, consumer, container);
- }
-
- @Override
- public void handleBatch(
- Exception thrownException,
- ConsumerRecords<?, ?> data,
- Consumer<?, ?> consumer,
- MessageListenerContainer container,
- Runnable invokeListener)
- {
- this.exception = thrownException;
- super.handleBatch(thrownException, data, consumer, container, invokeListener);
- }
-}