From e54f6036fecc54ea37001dae5f64d88502acfbe1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 11 Sep 2022 14:19:29 +0200 Subject: [PATCH] =?utf8?q?@KafkaListener=20im=20Test=20erg=C3=A4nzt,=20der?= =?utf8?q?=20das=20DLT=20mit=20liest?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/DeadLetterTopicConsumer.java | 37 +++++++++++++++++++ .../juplo/kafka/GenericApplicationTests.java | 6 +++ 2 files changed, 43 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java new file mode 100644 index 0000000..a52a6a5 --- /dev/null +++ b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java @@ -0,0 +1,37 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + + +@Slf4j +public class DeadLetterTopicConsumer +{ + List> messages = new LinkedList<>(); + + + @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT") + public void receive(Message message) + { + log.info( + "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}", + message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC), + message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID), + message.getHeaders().get(KafkaHeaders.OFFSET), + message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY), + new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)), + ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(), + ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(), + message.getHeaders().get(KafkaHeaders.MESSAGE_KEY), + message.getPayload(), + new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))); + + messages.add(message); + } +} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index b98066f..d65dd8e 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -396,5 +396,11 @@ abstract class GenericApplicationTests { return factory.createConsumer(); } + + @Bean + public DeadLetterTopicConsumer deadLetterTopicConsumer() + { + return new DeadLetterTopicConsumer(); + } } } -- 2.20.1