X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=0000000000000000000000000000000000000000;hb=25c2044064722af20f64651a32e94fb392710bbc;hp=b5f6187c82f70f0c5dfecb40845fa60fc6846ffe;hpb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index b5f6187..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,110 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.Map; -import java.util.Optional; - -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaOperations; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.backoff.FixedBackOff; - - -@Configuration -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) -public class ApplicationConfiguration -{ - @Bean - public ApplicationRecordHandler applicationRecordHandler( - AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return new ApplicationRecordHandler( - adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), - kafkaProperties.getClientId()); - } - - @Bean - public AdderResults adderResults() - { - return new AdderResults(); - } - - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - KafkaProperties kafkaProperties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - kafkaProperties.getClientId()); - } - - @Bean - public EndlessConsumer endlessConsumer( - RecordHandler recordHandler, - KafkaProperties kafkaProperties, - KafkaListenerEndpointRegistry endpointRegistry) - { - return - new EndlessConsumer( - kafkaProperties.getClientId(), - endpointRegistry, - recordHandler); - } - - @Bean - public ProducerFactory producerFactory( - KafkaProperties properties) - { - return new DefaultKafkaProducerFactory<>( - properties.getProducer().buildProperties(), - new StringSerializer(), - new DelegatingByTypeSerializer( - Map.of( - byte[].class, new ByteArraySerializer(), - MessageAddNumber.class, new JsonSerializer<>(), - MessageCalculateSum.class, new JsonSerializer<>()))); - } - - @Bean - public KafkaTemplate kafkaTemplate( - ProducerFactory producerFactory) - { - return new KafkaTemplate<>(producerFactory); - } - - @Bean - public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( - KafkaOperations kafkaTemplate) - { - return new DeadLetterPublishingRecoverer(kafkaTemplate); - } - - @Bean - public DefaultErrorHandler errorHandler( - DeadLetterPublishingRecoverer recoverer) - { - return new DefaultErrorHandler( - recoverer, - new FixedBackOff(0l, 0l)); - } -}