Test repariert: Explizite Consumer-Konfiguration für DLT-Consumer
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
index 003a178..ac8a629 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -18,8 +19,10 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -409,6 +412,25 @@ abstract class GenericApplicationTests<K, V>
                        return factory.createConsumer();
                }
 
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, String> dltContainerFactory(
+      KafkaProperties properties)
+    {
+      Map<String, Object> consumerProperties = new HashMap<>();
+
+      consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+      consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+      consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+      consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+      DefaultKafkaConsumerFactory dltConsumerFactory =
+        new DefaultKafkaConsumerFactory<>(consumerProperties);
+      ConcurrentKafkaListenerContainerFactory<String, String> factory =
+        new ConcurrentKafkaListenerContainerFactory<>();
+      factory.setConsumerFactory(dltConsumerFactory);
+      return factory;
+    }
+
                @Bean
                public DeadLetterTopicConsumer deadLetterTopicConsumer()
                {