- @Bean
- public ApplicationRecordHandler applicationRecordHandler(
- AdderResults adderResults,
- KafkaProperties kafkaProperties,
- ApplicationProperties applicationProperties)
- {
- return new ApplicationRecordHandler(
- adderResults,
- Optional.ofNullable(applicationProperties.getThrottle()),
- kafkaProperties.getConsumer().getGroupId());
- }
-
- @Bean
- public AdderResults adderResults()
- {
- return new AdderResults();
- }
-
- @Bean
- public ApplicationRebalanceListener rebalanceListener(
- ApplicationRecordHandler recordHandler,
- AdderResults adderResults,
- StateRepository stateRepository,
- KafkaProperties kafkaProperties)
- {
- return new ApplicationRebalanceListener(
- recordHandler,
- adderResults,
- stateRepository,
- kafkaProperties.getConsumer().getGroupId());
- }
-
- @Bean
- ApplicationHealthIndicator applicationHealthIndicator(
- KafkaListenerEndpointRegistry registry,
- KafkaProperties properties)
- {
- return new ApplicationHealthIndicator(
- properties.getConsumer().getGroupId(),
- registry);
- }
-
- @Bean
- public ProducerFactory<String, Object> producerFactory(
- KafkaProperties properties)
- {
- return new DefaultKafkaProducerFactory<>(
- properties.getProducer().buildProperties(),
- new StringSerializer(),
- new DelegatingByTypeSerializer(
- Map.of(
- byte[].class, new ByteArraySerializer(),
- MessageAddNumber.class, new JsonSerializer<>(),
- MessageCalculateSum.class, new JsonSerializer<>())));
- }
-
- @Bean
- public KafkaTemplate<String, Object> kafkaTemplate(
- ProducerFactory<String, Object> producerFactory)
- {
- return new KafkaTemplate<>(producerFactory);
- }
-
- @Bean
- public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
- KafkaOperations<?, ?> kafkaTemplate)
- {
- return new DeadLetterPublishingRecoverer(kafkaTemplate);
- }
-
- @Bean
- public DefaultErrorHandler errorHandler(
- DeadLetterPublishingRecoverer recoverer)