From: Kai Moritz Date: Sun, 11 Sep 2022 12:19:29 +0000 (+0200) Subject: @KafkaListener im Test ergänzt, der das DLT mit liest X-Git-Tag: sumup-adder--springified---lvm-2-tage~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e54f6036fecc54ea37001dae5f64d88502acfbe1;p=demos%2Fkafka%2Ftraining @KafkaListener im Test ergänzt, der das DLT mit liest --- diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java new file mode 100644 index 0000000..a52a6a5 --- /dev/null +++ b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java @@ -0,0 +1,37 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + + +@Slf4j +public class DeadLetterTopicConsumer +{ + List> messages = new LinkedList<>(); + + + @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT") + public void receive(Message message) + { + log.info( + "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}", + message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC), + message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID), + message.getHeaders().get(KafkaHeaders.OFFSET), + message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY), + new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)), + ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(), + ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(), + message.getHeaders().get(KafkaHeaders.MESSAGE_KEY), + message.getPayload(), + new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))); + + messages.add(message); + } +} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index b98066f..d65dd8e 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -396,5 +396,11 @@ abstract class GenericApplicationTests { return factory.createConsumer(); } + + @Bean + public DeadLetterTopicConsumer deadLetterTopicConsumer() + { + return new DeadLetterTopicConsumer(); + } } }