1 package de.juplo.kafka;
3 import org.apache.kafka.clients.consumer.KafkaConsumer;
4 import org.apache.kafka.clients.producer.KafkaProducer;
5 import org.apache.kafka.common.serialization.IntegerDeserializer;
6 import org.apache.kafka.common.serialization.StringDeserializer;
7 import org.apache.kafka.common.serialization.StringSerializer;
8 import org.springframework.boot.context.properties.EnableConfigurationProperties;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Configuration;
11 import org.springframework.kafka.support.serializer.JsonSerializer;
13 import java.util.Optional;
14 import java.util.Properties;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
20 @EnableConfigurationProperties(ApplicationProperties.class)
21 public class ApplicationConfiguration
24 public ApplicationRecordHandler recordHandler(
25 KafkaProducer<String, Object> kafkaProducer,
26 ApplicationProperties properties)
28 return new ApplicationRecordHandler(
30 Optional.ofNullable(properties.getErrorPosition()),
31 properties.getClientId(),
32 properties.getTopicOut());
36 public EndlessConsumer<String, Integer> endlessConsumer(
37 KafkaConsumer<String, Integer> kafkaConsumer,
38 ExecutorService executor,
39 ApplicationRecordHandler recordHandler,
40 ApplicationProperties properties)
43 new EndlessConsumer<>(
45 properties.getClientId(),
46 properties.getTopicIn(),
52 public ExecutorService executor()
54 return Executors.newSingleThreadExecutor();
57 @Bean(destroyMethod = "close")
58 public KafkaConsumer<String, Integer> kafkaConsumer(ApplicationProperties properties)
60 Properties props = new Properties();
62 props.put("bootstrap.servers", properties.getBootstrapServer());
63 props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
64 props.put("group.id", properties.getGroupId());
65 props.put("client.id", properties.getClientId());
66 props.put("auto.offset.reset", properties.getAutoOffsetReset());
67 props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
68 props.put("metadata.max.age.ms", "1000");
69 props.put("key.deserializer", StringDeserializer.class.getName());
70 props.put("value.deserializer", IntegerDeserializer.class.getName());
72 return new KafkaConsumer<>(props);
75 @Bean(destroyMethod = "close")
76 public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
78 Properties props = new Properties();
79 props.put("bootstrap.servers", properties.getBootstrapServer());
80 props.put("client.id", properties.getClientId());
81 props.put("acks", properties.getAcks());
82 props.put("batch.size", properties.getBatchSize());
83 props.put("delivery.timeout.ms", 20000); // 20 Sekunden
84 props.put("request.timeout.ms", 10000); // 10 Sekunden
85 props.put("linger.ms", properties.getLingerMs());
86 props.put("compression.type", properties.getCompressionType());
87 props.put("key.serializer", StringSerializer.class.getName());
88 props.put("value.serializer", JsonSerializer.class.getName());
89 props.put(JsonSerializer.TYPE_MAPPINGS,
90 "ADD:" + AddNumberMessage.class.getName() + "," +
91 "CALC:" + CalculateSumMessage.class.getName());
94 return new KafkaProducer<>(props);