+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.Message;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-
-@Slf4j
-public class DeadLetterTopicConsumer
-{
- List<Message<?>> messages = new LinkedList<>();
-
-
- @KafkaListener(
- id = "DLT",
- topics = "${sumup.adder.topic}.DLT",
- containerFactory = "dltContainerFactory")
- public void receive(Message<?> message)
- {
- log.info(
- "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
- message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
- message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
- message.getHeaders().get(KafkaHeaders.OFFSET),
- message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
- new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
- ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
- ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
- message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
- message.getPayload(),
- new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
-
- messages.add(message);
- }
-}