273f509c6e2de14798bf8d02a4c8d3d382f5b3a9
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationErrorHandler.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.Consumer;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
7 import org.springframework.kafka.listener.MessageListenerContainer;
8
9 import java.util.List;
10 import java.util.Optional;
11
12
13 public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
14 {
15   private Exception exception;
16
17
18   public synchronized Optional<Exception> getException()
19   {
20     return Optional.ofNullable(exception);
21   }
22
23   public synchronized void clearException()
24   {
25     this.exception = null;
26   }
27
28
29   @Override
30   public void handleOtherException(
31       Exception thrownException, Consumer<?, ?> consumer,
32       MessageListenerContainer container,
33       boolean batchListener)
34   {
35     this.exception = thrownException;
36     super.handleOtherException(thrownException, consumer, container, batchListener);
37   }
38
39   @Override
40   public void handleRemaining(
41       Exception thrownException,
42       List<ConsumerRecord<?, ?>> records,
43       Consumer<?, ?> consumer,
44       MessageListenerContainer container)
45   {
46     this.exception = thrownException;
47     super.handleRemaining(thrownException, records, consumer, container);
48   }
49
50   @Override
51   public void handleBatch(
52       Exception thrownException,
53       ConsumerRecords<?, ?> data,
54       Consumer<?, ?> consumer,
55       MessageListenerContainer container,
56       Runnable invokeListener)
57   {
58     this.exception = thrownException;
59     super.handleBatch(thrownException, data, consumer, container, invokeListener);
60   }
61 }