X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=0642aa444665ef51d296d3c4a6bee1be1644302e;hb=0dadf92f6a0724e95385c4e054aff1f800ef7375;hp=0000000000000000000000000000000000000000;hpb=1b978296e798614b3ca8317b43acd1a44a774ecd;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 0000000..0642aa4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,46 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; + + +@Configuration +@EnableConfigurationProperties(ApplicationProperties.class) +public class ApplicationConfiguration +{ + @Bean + public RestProducer restProducer( + ApplicationProperties properties, + KafkaProducer kafkaProducer) + { + return + new RestProducer( + properties.getClientId(), + properties.getTopic(), + properties.getPartition(), + kafkaProducer); + } + + @Bean(destroyMethod = "close") + public KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getAcks()); + props.put("batch.size", properties.getBatchSize()); + props.put("delivery.timeout.ms", 20000); // 20 Sekunden + props.put("request.timeout.ms", 10000); // 10 Sekunden + props.put("linger.ms", properties.getLingerMs()); + props.put("compression.type", properties.getCompressionType()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } +}