+ Map<String, Object> consumerProperties = new HashMap<>();
+
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ DefaultKafkaConsumerFactory dltConsumerFactory =
+ new DefaultKafkaConsumerFactory<>(consumerProperties);
+ ConcurrentKafkaListenerContainerFactory<String, String> factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(dltConsumerFactory);
+ return factory;