1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.ConsumerRecords;
7 import org.apache.kafka.common.TopicPartition;
8 import org.springframework.kafka.listener.CommonErrorHandler;
9 import org.springframework.kafka.listener.MessageListenerContainer;
11 import java.util.HashMap;
12 import java.util.List;
14 import java.util.Optional;
18 public class ApplicationErrorHandler implements CommonErrorHandler
20 private Exception exception;
21 private boolean ack = true;
24 public boolean remainingRecords()
30 public void handleOtherException(
31 Exception thrownException,
32 Consumer<?, ?> consumer,
33 MessageListenerContainer container,
34 boolean batchListener)
36 rememberExceptionAndStopContainer(thrownException, container);
40 public void handleRemaining(
41 Exception thrownException,
42 List<ConsumerRecord<?, ?>> records,
43 Consumer<?, ?> consumer,
44 MessageListenerContainer container)
46 Map<TopicPartition, Long> offsets = new HashMap<>();
47 records.forEach(record ->
48 offsets.computeIfAbsent(
49 new TopicPartition(record.topic(), record.partition()),
50 offset -> record.offset()));
51 offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
52 rememberExceptionAndStopContainer(thrownException, container);
56 public void handleBatch(
57 Exception thrownException,
58 ConsumerRecords<?, ?> data,
59 Consumer<?, ?> consumer,
60 MessageListenerContainer container,
61 Runnable invokeListener)
63 // Do not commit the polled offsets on a logic-error
65 rememberExceptionAndStopContainer(thrownException, container);
68 private void rememberExceptionAndStopContainer(
70 MessageListenerContainer container)
72 log.error("{}, stopping container {} abnormally", exception, container);
73 this.exception = exception;
74 container.stopAbnormally(() -> log.info("{} is stopped", container));
78 public boolean isAckAfterHandle()
84 public Optional<Exception> getException()
86 return Optional.ofNullable(exception);
89 public void clearState()
91 this.exception = null;