From eb54f1e206c8a0773a84fc8a0ae348aa09cae4ab Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 7 May 2025 20:14:01 +0200 Subject: [PATCH] =?utf8?q?2.=20Schritt=20Live-Umbau:=20Producer=20=C3=BCbe?= =?utf8?q?r=20Factory=20erzeugen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0090cee..6088992 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ProducerFactory; import java.time.Duration; -import java.util.Properties; @Configuration @@ -34,23 +32,8 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public Producer kafkaProducer(ProducerFactory producerFactory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); + return producerFactory.createProducer(); } } -- 2.20.1