X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=ac8a6298fc17edc6f790f2ea58a292ed2231fe91;hb=62f4cd9765ce308804bdf1ced3fab02d7c7eac2e;hp=003a1780386cb5d139dee5a3154f56d7134f77f2;hpb=803f73e0c0d6bf65e5887e7bfa5c45d8a5c29abc;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 003a178..ac8a629 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -18,8 +19,10 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -409,6 +412,25 @@ abstract class GenericApplicationTests return factory.createConsumer(); } + @Bean + public ConcurrentKafkaListenerContainerFactory dltContainerFactory( + KafkaProperties properties) + { + Map consumerProperties = new HashMap<>(); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + DefaultKafkaConsumerFactory dltConsumerFactory = + new DefaultKafkaConsumerFactory<>(consumerProperties); + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(dltConsumerFactory); + return factory; + } + @Bean public DeadLetterTopicConsumer deadLetterTopicConsumer() {