+ @Bean
+ public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
+ {
+ return new TestRecordHandler(applicationRecordHandler);
+ }
+
+ @Bean(destroyMethod = "close")
+ public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
+ {
+ return factory.createConsumer();
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, String> dltContainerFactory(
+ KafkaProperties properties)
+ {
+ Map<String, Object> consumerProperties = new HashMap<>();
+
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ DefaultKafkaConsumerFactory dltConsumerFactory =
+ new DefaultKafkaConsumerFactory<>(consumerProperties);
+ ConcurrentKafkaListenerContainerFactory<String, String> factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(dltConsumerFactory);
+ return factory;
+ }
+
+ @Bean
+ public DeadLetterTopicConsumer deadLetterTopicConsumer()
+ {
+ return new DeadLetterTopicConsumer();
+ }