Vereinfachte Version der auf Spring Kafka basierenden Implementierung
[demos/kafka/training] / src / test / java / de / juplo / kafka / DeadLetterTopicConsumer.java
diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java
deleted file mode 100644 (file)
index ac8c65d..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.Message;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-
-@Slf4j
-public class DeadLetterTopicConsumer
-{
-  List<Message<?>> messages = new LinkedList<>();
-
-
-  @KafkaListener(
-      id = "DLT",
-      topics = "${sumup.adder.topic}.DLT",
-      containerFactory = "dltContainerFactory")
-  public void receive(Message<?> message)
-  {
-    log.info(
-        "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
-        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
-        message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
-        message.getHeaders().get(KafkaHeaders.OFFSET),
-        message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
-        new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
-        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
-        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
-        message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
-        message.getPayload(),
-        new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
-
-    messages.add(message);
-  }
-}