From: Kai Moritz <kai@juplo.de>
Date: Sun, 11 Sep 2022 12:19:29 +0000 (+0200)
Subject: @KafkaListener im Test ergänzt, der das DLT mit liest
X-Git-Tag: sumup-adder--springified---lvm-2-tage~4
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=e54f6036fecc54ea37001dae5f64d88502acfbe1;p=demos%2Fkafka%2Ftraining

@KafkaListener im Test ergänzt, der das DLT mit liest
---

diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java
new file mode 100644
index 00000000..a52a6a52
--- /dev/null
+++ b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java
@@ -0,0 +1,37 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Slf4j
+public class DeadLetterTopicConsumer
+{
+  List<Message<?>> messages = new LinkedList<>();
+
+
+  @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT")
+  public void receive(Message<?> message)
+  {
+    log.info(
+        "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
+        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
+        message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
+        message.getHeaders().get(KafkaHeaders.OFFSET),
+        message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
+        new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
+        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
+        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
+        message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
+        message.getPayload(),
+        new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
+
+    messages.add(message);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
index b98066fc..d65dd8e7 100644
--- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
@@ -396,5 +396,11 @@ abstract class GenericApplicationTests<K, V>
 		{
 			return factory.createConsumer();
 		}
+
+		@Bean
+		public DeadLetterTopicConsumer deadLetterTopicConsumer()
+		{
+			return new DeadLetterTopicConsumer();
+		}
 	}
 }