+ return new DeadLetterPublishingRecoverer(
+ template,
+ (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
+ }
+
+ @Bean(destroyMethod = "close")
+ public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
+ {
+ return factory.createConsumer();