--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Slf4j
+public class DeadLetterTopicConsumer
+{
+ List<Message<?>> messages = new LinkedList<>();
+
+
+ @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT")
+ public void receive(Message<?> message)
+ {
+ log.info(
+ "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
+ message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
+ message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
+ message.getHeaders().get(KafkaHeaders.OFFSET),
+ message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
+ new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
+ ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
+ ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
+ message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
+ message.getPayload(),
+ new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
+
+ messages.add(message);
+ }
+}