@KafkaListener im Test ergänzt, der das DLT mit liest
[demos/kafka/training] / src / test / java / de / juplo / kafka / DeadLetterTopicConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.kafka.annotation.KafkaListener;
5 import org.springframework.kafka.support.KafkaHeaders;
6 import org.springframework.messaging.Message;
7
8 import java.nio.ByteBuffer;
9 import java.util.LinkedList;
10 import java.util.List;
11
12
13 @Slf4j
14 public class DeadLetterTopicConsumer
15 {
16   List<Message<?>> messages = new LinkedList<>();
17
18
19   @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT")
20   public void receive(Message<?> message)
21   {
22     log.info(
23         "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
24         message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
25         message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
26         message.getHeaders().get(KafkaHeaders.OFFSET),
27         message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
28         new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
29         ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
30         ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
31         message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
32         message.getPayload(),
33         new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
34
35     messages.add(message);
36   }
37 }