Konfig-Parameter zum künstlichen Verzögern der Verabeitung eingebaut
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.KafkaConsumer;
4 import org.apache.kafka.common.serialization.StringDeserializer;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.context.annotation.Configuration;
8
9 import java.time.Clock;
10 import java.util.Optional;
11 import java.util.Properties;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.Executors;
14
15
16 @Configuration
17 @EnableConfigurationProperties(ApplicationProperties.class)
18 public class ApplicationConfiguration
19 {
20   @Bean
21   public ApplicationRecordHandler recordHandler(
22       AdderResults adderResults,
23       ApplicationProperties properties)
24   {
25     return new ApplicationRecordHandler(
26         adderResults,
27         Optional.ofNullable(properties.getThrottle()));
28   }
29
30   @Bean
31   public AdderResults adderResults()
32   {
33     return new AdderResults();
34   }
35
36   @Bean
37   public ApplicationRebalanceListener rebalanceListener(
38       ApplicationRecordHandler recordHandler,
39       AdderResults adderResults,
40       StateRepository stateRepository,
41       ApplicationProperties properties)
42   {
43     return new ApplicationRebalanceListener(
44         recordHandler,
45         adderResults,
46         stateRepository,
47         properties.getClientId());
48   }
49
50   @Bean
51   public EndlessConsumer<String, String> endlessConsumer(
52       KafkaConsumer<String, String> kafkaConsumer,
53       ExecutorService executor,
54       ApplicationRebalanceListener rebalanceListener,
55       ApplicationRecordHandler recordHandler,
56       ApplicationProperties properties)
57   {
58     return
59         new EndlessConsumer<>(
60             executor,
61             properties.getClientId(),
62             properties.getTopic(),
63             kafkaConsumer,
64             rebalanceListener,
65             recordHandler);
66   }
67
68   @Bean
69   public ExecutorService executor()
70   {
71     return Executors.newSingleThreadExecutor();
72   }
73
74   @Bean(destroyMethod = "close")
75   public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
76   {
77     Properties props = new Properties();
78
79     props.put("bootstrap.servers", properties.getBootstrapServer());
80     props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
81     props.put("group.id", properties.getGroupId());
82     props.put("client.id", properties.getClientId());
83     props.put("auto.offset.reset", properties.getAutoOffsetReset());
84     props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
85     props.put("metadata.max.age.ms", "1000");
86     props.put("key.deserializer", StringDeserializer.class.getName());
87     props.put("value.deserializer", StringDeserializer.class.getName());
88
89     return new KafkaConsumer<>(props);
90   }
91 }