Service ergänzt, der das Dead-Letter-Topic ausliest
[demos/kafka/training] / src / test / java / de / juplo / kafka / DeadLetterTopicConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.kafka.annotation.KafkaListener;
5 import org.springframework.kafka.support.KafkaHeaders;
6 import org.springframework.messaging.Message;
7
8 import java.nio.ByteBuffer;
9 import java.util.LinkedList;
10 import java.util.List;
11
12
13 @Slf4j
14 public class DeadLetterTopicConsumer
15 {
16   List<Message<?>> messages = new LinkedList<>();
17
18
19   @KafkaListener(
20       id = "DLT",
21       topics = "${sumup.adder.topic}.DLT",
22       containerFactory = "dltContainerFactory")
23   public void receive(Message<?> message)
24   {
25     log.info(
26         "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
27         message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
28         message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
29         message.getHeaders().get(KafkaHeaders.OFFSET),
30         message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
31         new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
32         ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
33         ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
34         message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
35         message.getPayload(),
36         new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
37
38     messages.add(message);
39   }
40 }