@KafkaListener im Test ergänzt, der das DLT mit liest
authorKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 12:19:29 +0000 (14:19 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 19:13:21 +0000 (21:13 +0200)
src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/GenericApplicationTests.java

diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java
new file mode 100644 (file)
index 0000000..a52a6a5
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Slf4j
+public class DeadLetterTopicConsumer
+{
+  List<Message<?>> messages = new LinkedList<>();
+
+
+  @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT")
+  public void receive(Message<?> message)
+  {
+    log.info(
+        "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
+        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
+        message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
+        message.getHeaders().get(KafkaHeaders.OFFSET),
+        message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
+        new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
+        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
+        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
+        message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
+        message.getPayload(),
+        new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
+
+    messages.add(message);
+  }
+}
index b98066f..d65dd8e 100644 (file)
@@ -396,5 +396,11 @@ abstract class GenericApplicationTests<K, V>
                {
                        return factory.createConsumer();
                }
+
+               @Bean
+               public DeadLetterTopicConsumer deadLetterTopicConsumer()
+               {
+                       return new DeadLetterTopicConsumer();
+               }
        }
 }