+ public ProducerFactory<String, Object> producerFactory(KafkaProperties properties) {
+ return new DefaultKafkaProducerFactory<>(
+ properties.getProducer().buildProperties(),
+ new StringSerializer(),
+ new DelegatingByTypeSerializer(Map.of(
+ byte[].class, new ByteArraySerializer(),
+ ClientMessage.class, new JsonSerializer<>())));
+ }
+
+ @Bean
+ public KafkaTemplate<String, Object> kafkaTemplate(
+ ProducerFactory<String, Object> producerFactory) {
+
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ public DeadLetterPublishingRecoverer recoverer(
+ ApplicationProperties properties,
+ KafkaOperations<?, ?> template)
+ {
+ return new DeadLetterPublishingRecoverer(
+ template,
+ (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)