+ public ProducerFactory<String, Object> producerFactory(KafkaProperties properties) {
+ return new DefaultKafkaProducerFactory<>(
+ properties.getProducer().buildProperties(),
+ new StringSerializer(),
+ new DelegatingByTypeSerializer(Map.of(
+ byte[].class, new ByteArraySerializer(),
+ ClientMessage.class, new JsonSerializer<>())));
+ }
+
+ @Bean
+ public KafkaTemplate<String, Object> kafkaTemplate(
+ ProducerFactory<String, Object> producerFactory) {
+
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ public DeadLetterPublishingRecoverer recoverer(
+ ApplicationProperties properties,
+ KafkaOperations<?, ?> template)
+ {
+ return new DeadLetterPublishingRecoverer(
+ template,
+ (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
+ }
+
+ @Bean(destroyMethod = "close")
+ public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)