Test repariert: Explizite Consumer-Konfiguration für DLT-Consumer
authorKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 13:52:12 +0000 (15:52 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 19:54:16 +0000 (21:54 +0200)
* Der `DeadLetterTopicConsumer` wird jetzt explizit mit einem Consumer
  konfiguriert, der alle Nachrichten schlicht als `String` deserialisiert.
* Dadurch kommt es beim Einlesen der Nachrichten nicht mehr zu Fehlern
  und der Test arbeitet wie erwartet.
* Die Definitionen des Type-Mappings für den Producer werden daher
  nicht mehr benötigt.

src/main/resources/application.yml
src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 0bc592c..2f6f859 100644 (file)
@@ -44,10 +44,6 @@ spring:
           CALC:de.juplo.kafka.MessageCalculateSum
     producer:
       bootstrap-servers: :9092
-      properties:
-        spring.json.type.mapping: >
-          ADD:de.juplo.kafka.MessageAddNumber,
-          CALC:de.juplo.kafka.MessageCalculateSum
 logging:
   level:
     root: INFO
index a52a6a5..ac8c65d 100644 (file)
@@ -16,7 +16,10 @@ public class DeadLetterTopicConsumer
   List<Message<?>> messages = new LinkedList<>();
 
 
-  @KafkaListener(id = "DLT", topics = "${sumup.adder.topic}.DLT")
+  @KafkaListener(
+      id = "DLT",
+      topics = "${sumup.adder.topic}.DLT",
+      containerFactory = "dltContainerFactory")
   public void receive(Message<?> message)
   {
     log.info(
index 003a178..ac8a629 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -18,8 +19,10 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -409,6 +412,25 @@ abstract class GenericApplicationTests<K, V>
                        return factory.createConsumer();
                }
 
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, String> dltContainerFactory(
+      KafkaProperties properties)
+    {
+      Map<String, Object> consumerProperties = new HashMap<>();
+
+      consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+      consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+      consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+      consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+      DefaultKafkaConsumerFactory dltConsumerFactory =
+        new DefaultKafkaConsumerFactory<>(consumerProperties);
+      ConcurrentKafkaListenerContainerFactory<String, String> factory =
+        new ConcurrentKafkaListenerContainerFactory<>();
+      factory.setConsumerFactory(dltConsumerFactory);
+      return factory;
+    }
+
                @Bean
                public DeadLetterTopicConsumer deadLetterTopicConsumer()
                {