1 package de.juplo.kafka;
3 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
4 import org.springframework.boot.context.properties.EnableConfigurationProperties;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
8 import java.util.Optional;
10 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
14 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
15 public class ApplicationConfiguration
18 public ApplicationRecordHandler applicationRecordHandler(
19 AdderResults adderResults,
20 KafkaProperties kafkaProperties,
21 ApplicationProperties applicationProperties)
23 return new ApplicationRecordHandler(
25 Optional.ofNullable(applicationProperties.getThrottle()),
26 kafkaProperties.getClientId());
30 public AdderResults adderResults()
32 return new AdderResults();
36 public ApplicationRebalanceListener rebalanceListener(
37 ApplicationRecordHandler recordHandler,
38 AdderResults adderResults,
39 StateRepository stateRepository,
40 KafkaProperties kafkaProperties)
42 return new ApplicationRebalanceListener(
46 kafkaProperties.getClientId());
50 public ApplicationErrorHandler applicationErrorHandler()
52 return new ApplicationErrorHandler();
56 public EndlessConsumer endlessConsumer(
57 RecordHandler recordHandler,
58 ApplicationErrorHandler errorHandler,
59 KafkaProperties kafkaProperties,
60 KafkaListenerEndpointRegistry endpointRegistry)
64 kafkaProperties.getClientId(),