From: Kai Moritz Date: Wed, 7 May 2025 18:14:01 +0000 (+0200) Subject: 2. Schritt Live-Umbau: Producer über Factory erzeugen X-Git-Tag: spring/spring-producer--livecoding--schritte--2025-05-lvm~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=eb54f1e206c8a0773a84fc8a0ae348aa09cae4ab;p=demos%2Fkafka%2Ftraining 2. Schritt Live-Umbau: Producer über Factory erzeugen --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0090cee..6088992 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ProducerFactory; import java.time.Duration; -import java.util.Properties; @Configuration @@ -34,23 +32,8 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public Producer kafkaProducer(ProducerFactory producerFactory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); + return producerFactory.createProducer(); } }