import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
return factory.createConsumer();
}
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, String> dltContainerFactory(
+ KafkaProperties properties)
+ {
+ Map<String, Object> consumerProperties = new HashMap<>();
+
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ DefaultKafkaConsumerFactory dltConsumerFactory =
+ new DefaultKafkaConsumerFactory<>(consumerProperties);
+ ConcurrentKafkaListenerContainerFactory<String, String> factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(dltConsumerFactory);
+ return factory;
+ }
+
@Bean
public DeadLetterTopicConsumer deadLetterTopicConsumer()
{