X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FDeadLetterTopicConsumer.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FDeadLetterTopicConsumer.java;h=a52a6a522e84e0bcfc92a7165720ac16703a9599;hb=e54f6036fecc54ea37001dae5f64d88502acfbe1;hp=0000000000000000000000000000000000000000;hpb=ac154bb18a6c575fe01e70cba6a86d10580dfb89;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java new file mode 100644 index 0000000..a52a6a5 --- /dev/null +++ b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java @@ -0,0 +1,37 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + + +@Slf4j +public class DeadLetterTopicConsumer +{ + List> messages = new LinkedList<>(); + + + @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT") + public void receive(Message message) + { + log.info( + "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}", + message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC), + message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID), + message.getHeaders().get(KafkaHeaders.OFFSET), + message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY), + new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)), + ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(), + ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(), + message.getHeaders().get(KafkaHeaders.MESSAGE_KEY), + message.getPayload(), + new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))); + + messages.add(message); + } +}