b5f6187c82f70f0c5dfecb40845fa60fc6846ffe
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.common.serialization.ByteArraySerializer;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9
10 import java.util.Map;
11 import java.util.Optional;
12
13 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
14 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
15 import org.springframework.kafka.core.KafkaOperations;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.core.ProducerFactory;
18 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
19 import org.springframework.kafka.listener.DefaultErrorHandler;
20 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
21 import org.springframework.kafka.support.serializer.JsonSerializer;
22 import org.springframework.util.backoff.FixedBackOff;
23
24
25 @Configuration
26 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
27 public class ApplicationConfiguration
28 {
29   @Bean
30   public ApplicationRecordHandler applicationRecordHandler(
31       AdderResults adderResults,
32       KafkaProperties kafkaProperties,
33       ApplicationProperties applicationProperties)
34   {
35     return new ApplicationRecordHandler(
36         adderResults,
37         Optional.ofNullable(applicationProperties.getThrottle()),
38         kafkaProperties.getClientId());
39   }
40
41   @Bean
42   public AdderResults adderResults()
43   {
44     return new AdderResults();
45   }
46
47   @Bean
48   public ApplicationRebalanceListener rebalanceListener(
49       ApplicationRecordHandler recordHandler,
50       AdderResults adderResults,
51       StateRepository stateRepository,
52       KafkaProperties kafkaProperties)
53   {
54     return new ApplicationRebalanceListener(
55         recordHandler,
56         adderResults,
57         stateRepository,
58         kafkaProperties.getClientId());
59   }
60
61   @Bean
62   public EndlessConsumer endlessConsumer(
63       RecordHandler recordHandler,
64       KafkaProperties kafkaProperties,
65       KafkaListenerEndpointRegistry endpointRegistry)
66   {
67     return
68         new EndlessConsumer(
69             kafkaProperties.getClientId(),
70             endpointRegistry,
71             recordHandler);
72   }
73
74   @Bean
75   public ProducerFactory<String, Object> producerFactory(
76       KafkaProperties properties)
77   {
78     return new DefaultKafkaProducerFactory<>(
79         properties.getProducer().buildProperties(),
80         new StringSerializer(),
81         new DelegatingByTypeSerializer(
82             Map.of(
83                 byte[].class, new ByteArraySerializer(),
84                 MessageAddNumber.class, new JsonSerializer<>(),
85                 MessageCalculateSum.class, new JsonSerializer<>())));
86   }
87
88   @Bean
89   public KafkaTemplate<String, Object> kafkaTemplate(
90       ProducerFactory<String, Object> producerFactory)
91   {
92     return new KafkaTemplate<>(producerFactory);
93   }
94
95   @Bean
96   public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
97       KafkaOperations<?, ?> kafkaTemplate)
98   {
99     return new DeadLetterPublishingRecoverer(kafkaTemplate);
100   }
101
102   @Bean
103   public DefaultErrorHandler errorHandler(
104       DeadLetterPublishingRecoverer recoverer)
105   {
106     return new DefaultErrorHandler(
107         recoverer,
108         new FixedBackOff(0l, 0l));
109   }
110 }