@KafkaListener im Test ergänzt, der das DLT mit liest
[demos/kafka/training] / src / test / java / de / juplo / kafka / DeadLetterTopicConsumer.java
diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java
new file mode 100644 (file)
index 0000000..a52a6a5
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Slf4j
+public class DeadLetterTopicConsumer
+{
+  List<Message<?>> messages = new LinkedList<>();
+
+
+  @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT")
+  public void receive(Message<?> message)
+  {
+    log.info(
+        "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
+        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
+        message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
+        message.getHeaders().get(KafkaHeaders.OFFSET),
+        message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
+        new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
+        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
+        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
+        message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
+        message.getPayload(),
+        new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
+
+    messages.add(message);
+  }
+}