Tags für die Übung ergänzt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationConfiguration.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 import org.springframework.context.annotation.Bean;
7 import org.springframework.context.annotation.Configuration;
8
9 import java.util.Properties;
10
11
12 @Configuration
13 @EnableConfigurationProperties(ApplicationProperties.class)
14 public class ApplicationConfiguration
15 {
16   @Bean
17   public RestProducer restProducer(
18       ApplicationProperties properties,
19       KafkaProducer<String, String> kafkaProducer)
20   {
21     return
22         new RestProducer(
23             properties.getClientId(),
24             properties.getTopic(),
25             properties.getPartition(),
26             kafkaProducer);
27   }
28
29   @Bean(destroyMethod = "close")
30   public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
31   {
32     Properties props = new Properties();
33     props.put("bootstrap.servers", properties.getBootstrapServer());
34     props.put("client.id", properties.getClientId());
35     props.put("acks", properties.getAcks());
36     props.put("batch.size", properties.getBatchSize());
37     props.put("delivery.timeout.ms", 20000); // 20 Sekunden
38     props.put("request.timeout.ms",  10000); // 10 Sekunden
39     props.put("linger.ms", properties.getLingerMs());
40     props.put("compression.type", properties.getCompressionType());
41     props.put("key.serializer", StringSerializer.class.getName());
42     props.put("value.serializer", StringSerializer.class.getName());
43
44     return new KafkaProducer<>(props);
45   }
46 }