Vereinfachte Version der auf Spring Kafka basierenden Implementierung
[demos/kafka/training] / src / main / java / de / juplo / kafka / Application.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.serialization.ByteArraySerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.springframework.boot.SpringApplication;
7 import org.springframework.boot.autoconfigure.SpringBootApplication;
8 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
9 import org.springframework.boot.context.properties.EnableConfigurationProperties;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.kafka.annotation.EnableKafka;
12 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
13 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
14 import org.springframework.kafka.core.KafkaOperations;
15 import org.springframework.kafka.core.KafkaTemplate;
16 import org.springframework.kafka.core.ProducerFactory;
17 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
18 import org.springframework.kafka.listener.DefaultErrorHandler;
19 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
21 import org.springframework.util.backoff.FixedBackOff;
22
23 import java.util.Map;
24 import java.util.Optional;
25
26
27 @SpringBootApplication
28 @Slf4j
29 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
30 @EnableKafka
31 public class Application
32 {
33   @Bean
34   public ApplicationRecordHandler applicationRecordHandler(
35       AdderResults adderResults,
36       KafkaProperties kafkaProperties,
37       ApplicationProperties applicationProperties)
38   {
39     return new ApplicationRecordHandler(
40         adderResults,
41         Optional.ofNullable(applicationProperties.getThrottle()),
42         kafkaProperties.getConsumer().getGroupId());
43   }
44
45   @Bean
46   public AdderResults adderResults()
47   {
48     return new AdderResults();
49   }
50
51   @Bean
52   public ApplicationRebalanceListener rebalanceListener(
53       ApplicationRecordHandler recordHandler,
54       AdderResults adderResults,
55       StateRepository stateRepository,
56       KafkaProperties kafkaProperties)
57   {
58     return new ApplicationRebalanceListener(
59         recordHandler,
60         adderResults,
61         stateRepository,
62         kafkaProperties.getConsumer().getGroupId());
63   }
64
65   @Bean
66   ApplicationHealthIndicator applicationHealthIndicator(
67       KafkaListenerEndpointRegistry registry,
68       KafkaProperties properties)
69   {
70     return new ApplicationHealthIndicator(
71         properties.getConsumer().getGroupId(),
72         registry);
73   }
74
75   @Bean
76   public ProducerFactory<String, Object> producerFactory(
77       KafkaProperties properties)
78   {
79     return new DefaultKafkaProducerFactory<>(
80         properties.getProducer().buildProperties(),
81         new StringSerializer(),
82         new DelegatingByTypeSerializer(
83             Map.of(
84                 byte[].class, new ByteArraySerializer(),
85                 MessageAddNumber.class, new JsonSerializer<>(),
86                 MessageCalculateSum.class, new JsonSerializer<>())));
87   }
88
89   @Bean
90   public KafkaTemplate<String, Object> kafkaTemplate(
91       ProducerFactory<String, Object> producerFactory)
92   {
93     return new KafkaTemplate<>(producerFactory);
94   }
95
96   @Bean
97   public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
98       KafkaOperations<?, ?> kafkaTemplate)
99   {
100     return new DeadLetterPublishingRecoverer(kafkaTemplate);
101   }
102
103   @Bean
104   public DefaultErrorHandler errorHandler(
105       DeadLetterPublishingRecoverer recoverer)
106   {
107     return new DefaultErrorHandler(
108         recoverer,
109         new FixedBackOff(0l, 0l));
110   }
111
112
113   public static void main(String[] args)
114   {
115     SpringApplication.run(Application.class, args);
116   }
117 }