X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplication.java;h=0f9ea12c04ed6e352195d8d1e2b1bd65eb5c3ac6;hb=1bf30f5890d9ab0a1c7550fe472dec44f486a473;hp=69a97125726ef3a84ac885d226bc69db2051cc4f;hpb=25c2044064722af20f64651a32e94fb392710bbc;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 69a9712..0f9ea12 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,115 +1,22 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaOperations; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.backoff.FixedBackOff; - -import java.util.Map; -import java.util.Optional; - +import org.springframework.kafka.annotation.KafkaListener; @SpringBootApplication -@Slf4j -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) @EnableKafka +@Slf4j public class Application { - @Bean - public ApplicationRecordHandler applicationRecordHandler( - AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return new ApplicationRecordHandler( - adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), - kafkaProperties.getConsumer().getGroupId()); - } - - @Bean - public AdderResults adderResults() - { - return new AdderResults(); - } - - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - KafkaProperties kafkaProperties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - kafkaProperties.getConsumer().getGroupId()); - } - - @Bean - ApplicationHealthIndicator applicationHealthIndicator( - KafkaListenerEndpointRegistry registry, - KafkaProperties properties) + @KafkaListener(id = "supersimple", topics = "out") + public void recieve(String message) { - return new ApplicationHealthIndicator( - properties.getConsumer().getGroupId(), - registry); + log.info("Recieved message: {}", message); } - @Bean - public ProducerFactory producerFactory( - KafkaProperties properties) - { - return new DefaultKafkaProducerFactory<>( - properties.getProducer().buildProperties(), - new StringSerializer(), - new DelegatingByTypeSerializer( - Map.of( - byte[].class, new ByteArraySerializer(), - MessageAddNumber.class, new JsonSerializer<>(), - MessageCalculateSum.class, new JsonSerializer<>()))); - } - - @Bean - public KafkaTemplate kafkaTemplate( - ProducerFactory producerFactory) - { - return new KafkaTemplate<>(producerFactory); - } - - @Bean - public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( - KafkaOperations kafkaTemplate) - { - return new DeadLetterPublishingRecoverer(kafkaTemplate); - } - - @Bean - public DefaultErrorHandler errorHandler( - DeadLetterPublishingRecoverer recoverer) - { - return new DefaultErrorHandler( - recoverer, - new FixedBackOff(0l, 0l)); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args);