From: Kai Moritz Date: Sun, 11 Sep 2022 13:52:12 +0000 (+0200) Subject: Test repariert: Explizite Consumer-Konfiguration für DLT-Consumer X-Git-Tag: sumup-adder--springified---lvm-2-tage~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=62f4cd9765ce308804bdf1ced3fab02d7c7eac2e;p=demos%2Fkafka%2Ftraining Test repariert: Explizite Consumer-Konfiguration für DLT-Consumer * Der `DeadLetterTopicConsumer` wird jetzt explizit mit einem Consumer konfiguriert, der alle Nachrichten schlicht als `String` deserialisiert. * Dadurch kommt es beim Einlesen der Nachrichten nicht mehr zu Fehlern und der Test arbeitet wie erwartet. * Die Definitionen des Type-Mappings für den Producer werden daher nicht mehr benötigt. --- diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0bc592c..2f6f859 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -44,10 +44,6 @@ spring: CALC:de.juplo.kafka.MessageCalculateSum producer: bootstrap-servers: :9092 - properties: - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java index a52a6a5..ac8c65d 100644 --- a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java +++ b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java @@ -16,7 +16,10 @@ public class DeadLetterTopicConsumer List> messages = new LinkedList<>(); - @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT") + @KafkaListener( + id = "DLT", + topics = "${sumup.adder.topic}.DLT", + containerFactory = "dltContainerFactory") public void receive(Message message) { log.info( diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 003a178..ac8a629 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -18,8 +19,10 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -409,6 +412,25 @@ abstract class GenericApplicationTests return factory.createConsumer(); } + @Bean + public ConcurrentKafkaListenerContainerFactory dltContainerFactory( + KafkaProperties properties) + { + Map consumerProperties = new HashMap<>(); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + DefaultKafkaConsumerFactory dltConsumerFactory = + new DefaultKafkaConsumerFactory<>(consumerProperties); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(dltConsumerFactory); + return factory; + } + @Bean public DeadLetterTopicConsumer deadLetterTopicConsumer() {