`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationErrorHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.ConsumerRecords;
7 import org.apache.kafka.common.TopicPartition;
8 import org.springframework.kafka.listener.CommonErrorHandler;
9 import org.springframework.kafka.listener.MessageListenerContainer;
10
11 import java.util.HashMap;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Optional;
15
16
17 @Slf4j
18 public class ApplicationErrorHandler implements CommonErrorHandler
19 {
20   private Exception exception;
21   private boolean ack = true;
22
23   @Override
24   public boolean remainingRecords()
25   {
26     return true;
27   }
28
29   @Override
30   public void handleOtherException(
31     Exception thrownException,
32     Consumer<?, ?> consumer,
33     MessageListenerContainer container,
34     boolean batchListener)
35   {
36     rememberExceptionAndStopContainer(thrownException, container);
37   }
38
39   @Override
40   public void handleRemaining(
41     Exception thrownException,
42     List<ConsumerRecord<?, ?>> records,
43     Consumer<?, ?> consumer,
44     MessageListenerContainer container)
45   {
46     Map<TopicPartition, Long> offsets = new HashMap<>();
47     records.forEach(record ->
48       offsets.computeIfAbsent(
49           new TopicPartition(record.topic(), record.partition()),
50           offset -> record.offset()));
51     offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
52     rememberExceptionAndStopContainer(thrownException, container);
53   }
54
55   @Override
56   public void handleBatch(
57     Exception thrownException,
58     ConsumerRecords<?, ?> data,
59     Consumer<?, ?> consumer,
60     MessageListenerContainer container,
61     Runnable invokeListener)
62   {
63     // Do not commit the polled offsets on a logic-error
64     ack = false;
65     rememberExceptionAndStopContainer(thrownException, container);
66   }
67
68   private void rememberExceptionAndStopContainer(
69       Exception exception,
70       MessageListenerContainer container)
71   {
72     log.error("{}, stopping container {} abnormally", exception, container);
73     this.exception = exception;
74     container.stopAbnormally(() -> log.info("{} is stopped", container));
75   }
76
77   @Override
78   public boolean isAckAfterHandle()
79   {
80     return ack;
81   }
82
83
84   public Optional<Exception> getException()
85   {
86     return Optional.ofNullable(exception);
87   }
88
89   public void clearState()
90   {
91     this.exception = null;
92     this.ack = true;
93   }
94 }