`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
1 package de.juplo.kafka;
2
3 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
4 import org.springframework.boot.context.properties.EnableConfigurationProperties;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7
8 import java.util.Optional;
9
10 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
11
12
13 @Configuration
14 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
15 public class ApplicationConfiguration
16 {
17   @Bean
18   public ApplicationRecordHandler applicationRecordHandler(
19       AdderResults adderResults,
20       KafkaProperties kafkaProperties,
21       ApplicationProperties applicationProperties)
22   {
23     return new ApplicationRecordHandler(
24         adderResults,
25         Optional.ofNullable(applicationProperties.getThrottle()),
26         kafkaProperties.getClientId());
27   }
28
29   @Bean
30   public AdderResults adderResults()
31   {
32     return new AdderResults();
33   }
34
35   @Bean
36   public ApplicationRebalanceListener rebalanceListener(
37       ApplicationRecordHandler recordHandler,
38       AdderResults adderResults,
39       StateRepository stateRepository,
40       KafkaProperties kafkaProperties)
41   {
42     return new ApplicationRebalanceListener(
43         recordHandler,
44         adderResults,
45         stateRepository,
46         kafkaProperties.getClientId());
47   }
48
49   @Bean
50   public ApplicationErrorHandler applicationErrorHandler()
51   {
52     return new ApplicationErrorHandler();
53   }
54
55   @Bean
56   public EndlessConsumer endlessConsumer(
57       RecordHandler recordHandler,
58       ApplicationErrorHandler errorHandler,
59       KafkaProperties kafkaProperties,
60       KafkaListenerEndpointRegistry endpointRegistry)
61   {
62     return
63         new EndlessConsumer(
64             kafkaProperties.getClientId(),
65             endpointRegistry,
66             errorHandler,
67             recordHandler);
68   }
69 }