Wordcount-Implementierung mit Kafka-Boardmitteln und MongoDB als Storage
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.clients.consumer.KafkaConsumer;
5 import org.apache.kafka.common.serialization.StringDeserializer;
6 import org.springframework.boot.context.properties.EnableConfigurationProperties;
7 import org.springframework.context.annotation.Bean;
8 import org.springframework.context.annotation.Configuration;
9
10 import java.time.Clock;
11 import java.util.Properties;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.Executors;
14 import java.util.function.Consumer;
15
16
17 @Configuration
18 @EnableConfigurationProperties(ApplicationProperties.class)
19 public class ApplicationConfiguration
20 {
21   @Bean
22   public EndlessConsumer endlessConsumer(
23       KafkaConsumer<String, String> kafkaConsumer,
24       ExecutorService executor,
25       PartitionStatisticsRepository repository,
26       ApplicationProperties properties)
27   {
28     return
29         new EndlessConsumer(
30             executor,
31             repository,
32             properties.getClientId(),
33             properties.getTopic(),
34             Clock.systemDefaultZone(),
35             properties.getCommitInterval(),
36             kafkaConsumer);
37   }
38
39   @Bean
40   public ExecutorService executor()
41   {
42     return Executors.newSingleThreadExecutor();
43   }
44
45   @Bean(destroyMethod = "close")
46   public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
47   {
48     Properties props = new Properties();
49
50     props.put("bootstrap.servers", properties.getBootstrapServer());
51     props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
52     props.put("group.id", properties.getGroupId());
53     props.put("client.id", properties.getClientId());
54     props.put("enable.auto.commit", false);
55     props.put("auto.offset.reset", properties.getAutoOffsetReset());
56     props.put("metadata.max.age.ms", "1000");
57     props.put("key.deserializer", StringDeserializer.class.getName());
58     props.put("value.deserializer", StringDeserializer.class.getName());
59
60     return new KafkaConsumer<>(props);
61   }
62 }