1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.ByteArraySerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.boot.SpringApplication;
7 import org.springframework.boot.autoconfigure.SpringBootApplication;
8 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
9 import org.springframework.boot.context.properties.EnableConfigurationProperties;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.kafka.annotation.EnableKafka;
12 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
13 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
14 import org.springframework.kafka.core.KafkaOperations;
15 import org.springframework.kafka.core.KafkaTemplate;
16 import org.springframework.kafka.core.ProducerFactory;
17 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
18 import org.springframework.kafka.listener.DefaultErrorHandler;
19 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
21 import org.springframework.util.backoff.FixedBackOff;
24 import java.util.Optional;
27 @SpringBootApplication
29 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
31 public class Application
34 public ApplicationRecordHandler applicationRecordHandler(
35 AdderResults adderResults,
36 KafkaProperties kafkaProperties,
37 ApplicationProperties applicationProperties)
39 return new ApplicationRecordHandler(
41 Optional.ofNullable(applicationProperties.getThrottle()),
42 kafkaProperties.getConsumer().getGroupId());
46 public AdderResults adderResults()
48 return new AdderResults();
52 public ApplicationRebalanceListener rebalanceListener(
53 ApplicationRecordHandler recordHandler,
54 AdderResults adderResults,
55 StateRepository stateRepository,
56 KafkaProperties kafkaProperties)
58 return new ApplicationRebalanceListener(
62 kafkaProperties.getConsumer().getGroupId());
66 ApplicationHealthIndicator applicationHealthIndicator(
67 KafkaListenerEndpointRegistry registry,
68 KafkaProperties properties)
70 return new ApplicationHealthIndicator(
71 properties.getConsumer().getGroupId(),
76 public ProducerFactory<String, Object> producerFactory(
77 KafkaProperties properties)
79 return new DefaultKafkaProducerFactory<>(
80 properties.getProducer().buildProperties(),
81 new StringSerializer(),
82 new DelegatingByTypeSerializer(
84 byte[].class, new ByteArraySerializer(),
85 MessageAddNumber.class, new JsonSerializer<>(),
86 MessageCalculateSum.class, new JsonSerializer<>())));
90 public KafkaTemplate<String, Object> kafkaTemplate(
91 ProducerFactory<String, Object> producerFactory)
93 return new KafkaTemplate<>(producerFactory);
97 public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
98 KafkaOperations<?, ?> kafkaTemplate)
100 return new DeadLetterPublishingRecoverer(kafkaTemplate);
104 public DefaultErrorHandler errorHandler(
105 DeadLetterPublishingRecoverer recoverer)
107 return new DefaultErrorHandler(
109 new FixedBackOff(0l, 0l));
113 public static void main(String[] args)
115 SpringApplication.run(Application.class, args);