1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.kafka.annotation.KafkaListener;
5 import org.springframework.kafka.support.KafkaHeaders;
6 import org.springframework.messaging.Message;
8 import java.nio.ByteBuffer;
9 import java.util.LinkedList;
10 import java.util.List;
14 public class DeadLetterTopicConsumer
16 List<Message<?>> messages = new LinkedList<>();
19 @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT")
20 public void receive(Message<?> message)
23 "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
24 message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
25 message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
26 message.getHeaders().get(KafkaHeaders.OFFSET),
27 message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
28 new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
29 ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
30 ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
31 message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
33 new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
35 messages.add(message);