1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.kafka.annotation.KafkaListener;
5 import org.springframework.kafka.support.KafkaHeaders;
6 import org.springframework.messaging.Message;
8 import java.nio.ByteBuffer;
9 import java.util.LinkedList;
10 import java.util.List;
14 public class DeadLetterTopicConsumer
16 List<Message<?>> messages = new LinkedList<>();
21 topics = "${sumup.adder.topic}.DLT",
22 containerFactory = "dltContainerFactory")
23 public void receive(Message<?> message)
26 "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
27 message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
28 message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
29 message.getHeaders().get(KafkaHeaders.OFFSET),
30 message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
31 new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
32 ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
33 ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
34 message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
36 new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
38 messages.add(message);