Auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationErrorHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.springframework.kafka.listener.CommonErrorHandler;
7 import org.springframework.kafka.listener.MessageListenerContainer;
8 import org.springframework.util.Assert;
9
10 import java.util.Optional;
11
12
13 @Slf4j
14 public class ApplicationErrorHandler implements CommonErrorHandler
15 {
16   private Exception exception;
17   private boolean ack = true;
18
19
20   @Override
21   public void handleOtherException(
22     Exception thrownException,
23     Consumer<?, ?> consumer,
24     MessageListenerContainer container,
25     boolean batchListener)
26   {
27     Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
28     rememberExceptionAndStopContainer(thrownException, container);
29   }
30
31   @Override
32   public void handleBatch(
33     Exception thrownException,
34     ConsumerRecords<?, ?> data,
35     Consumer<?, ?> consumer,
36     MessageListenerContainer container,
37     Runnable invokeListener)
38   {
39     // Do not commit the polled offsets on a logic-error
40     ack = false;
41     rememberExceptionAndStopContainer(thrownException, container);
42   }
43
44   private void rememberExceptionAndStopContainer(
45       Exception exception,
46       MessageListenerContainer container)
47   {
48     log.error("{}, stopping container {} abnormally", exception, container);
49     this.exception = exception;
50     container.stopAbnormally(() -> log.info("{} is stopped", container));
51   }
52
53   @Override
54   public boolean isAckAfterHandle()
55   {
56     return ack;
57   }
58
59
60   public Optional<Exception> getException()
61   {
62     return Optional.ofNullable(exception);
63   }
64
65   public void clearState()
66   {
67     this.exception = null;
68     this.ack = true;
69   }
70 }