- @Bean
- public ProducerFactory<String, Object> producerFactory(
- KafkaProperties properties)
- {
- return new DefaultKafkaProducerFactory<>(
- properties.getProducer().buildProperties(),
- new StringSerializer(),
- new DelegatingByTypeSerializer(
- Map.of(
- byte[].class, new ByteArraySerializer(),
- MessageAddNumber.class, new JsonSerializer<>(),
- MessageCalculateSum.class, new JsonSerializer<>())));
- }
-
- @Bean
- public KafkaTemplate<String, Object> kafkaTemplate(
- ProducerFactory<String, Object> producerFactory)
- {
- return new KafkaTemplate<>(producerFactory);
- }
-
- @Bean
- public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
- KafkaOperations<?, ?> kafkaTemplate)
- {
- return new DeadLetterPublishingRecoverer(kafkaTemplate);
- }
-
- @Bean
- public DefaultErrorHandler errorHandler(
- DeadLetterPublishingRecoverer recoverer)
- {
- return new DefaultErrorHandler(
- recoverer,
- new FixedBackOff(0l, 0l));
- }
-
-