X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;h=a72431e6a6925ee2993f7b1927535a49bac7d397;hb=f0b07ceb43773f772c706ecf6422608878805170;hp=69a97125726ef3a84ac885d226bc69db2051cc4f;hpb=25c2044064722af20f64651a32e94fb392710bbc;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 69a9712..a72431e 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,117 +1,24 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaOperations; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.backoff.FixedBackOff; - -import java.util.Map; -import java.util.Optional; - +import org.springframework.kafka.annotation.KafkaListener; +// tag::supersimple[] @SpringBootApplication @Slf4j -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) -@EnableKafka public class Application { - @Bean - public ApplicationRecordHandler applicationRecordHandler( - AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return new ApplicationRecordHandler( - adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), - kafkaProperties.getConsumer().getGroupId()); - } - - @Bean - public AdderResults adderResults() - { - return new AdderResults(); - } - - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - KafkaProperties kafkaProperties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - kafkaProperties.getConsumer().getGroupId()); - } - - @Bean - ApplicationHealthIndicator applicationHealthIndicator( - KafkaListenerEndpointRegistry registry, - KafkaProperties properties) - { - return new ApplicationHealthIndicator( - properties.getConsumer().getGroupId(), - registry); - } - - @Bean - public ProducerFactory producerFactory( - KafkaProperties properties) - { - return new DefaultKafkaProducerFactory<>( - properties.getProducer().buildProperties(), - new StringSerializer(), - new DelegatingByTypeSerializer( - Map.of( - byte[].class, new ByteArraySerializer(), - MessageAddNumber.class, new JsonSerializer<>(), - MessageCalculateSum.class, new JsonSerializer<>()))); - } - - @Bean - public KafkaTemplate kafkaTemplate( - ProducerFactory producerFactory) - { - return new KafkaTemplate<>(producerFactory); - } - - @Bean - public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( - KafkaOperations kafkaTemplate) - { - return new DeadLetterPublishingRecoverer(kafkaTemplate); - } - - @Bean - public DefaultErrorHandler errorHandler( - DeadLetterPublishingRecoverer recoverer) + @KafkaListener(id = "supersimple", topics = "out") + public void recieve(String message) { - return new DefaultErrorHandler( - recoverer, - new FixedBackOff(0l, 0l)); + log.info("Recieved message: {}", message); } - public static void main(String[] args) { SpringApplication.run(Application.class, args); } } +// end::supersimple[] \ No newline at end of file