From 62f4cd9765ce308804bdf1ced3fab02d7c7eac2e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 11 Sep 2022 15:52:12 +0200 Subject: [PATCH] =?utf8?q?Test=20repariert:=20Explizite=20Consumer-Konfigu?= =?utf8?q?ration=20f=C3=BCr=20DLT-Consumer?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der `DeadLetterTopicConsumer` wird jetzt explizit mit einem Consumer konfiguriert, der alle Nachrichten schlicht als `String` deserialisiert. * Dadurch kommt es beim Einlesen der Nachrichten nicht mehr zu Fehlern und der Test arbeitet wie erwartet. * Die Definitionen des Type-Mappings für den Producer werden daher nicht mehr benötigt. --- src/main/resources/application.yml | 4 ---- .../juplo/kafka/DeadLetterTopicConsumer.java | 5 ++++- .../juplo/kafka/GenericApplicationTests.java | 22 +++++++++++++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0bc592c..2f6f859 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -44,10 +44,6 @@ spring: CALC:de.juplo.kafka.MessageCalculateSum producer: bootstrap-servers: :9092 - properties: - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java index a52a6a5..ac8c65d 100644 --- a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java +++ b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java @@ -16,7 +16,10 @@ public class DeadLetterTopicConsumer List> messages = new LinkedList<>(); - @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT") + @KafkaListener( + id = "DLT", + topics = "${sumup.adder.topic}.DLT", + containerFactory = "dltContainerFactory") public void receive(Message message) { log.info( diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 003a178..ac8a629 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -18,8 +19,10 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -409,6 +412,25 @@ abstract class GenericApplicationTests return factory.createConsumer(); } + @Bean + public ConcurrentKafkaListenerContainerFactory dltContainerFactory( + KafkaProperties properties) + { + Map consumerProperties = new HashMap<>(); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + DefaultKafkaConsumerFactory dltConsumerFactory = + new DefaultKafkaConsumerFactory<>(consumerProperties); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(dltConsumerFactory); + return factory; + } + @Bean public DeadLetterTopicConsumer deadLetterTopicConsumer() { -- 2.20.1