From ca0c08b4c1d6dfac5234e564870895676000ae63 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 18 Mar 2025 16:48:45 +0100 Subject: [PATCH] Umbau auf Autoconfig von Spring Kafka --- pom.xml | 9 ++----- .../juplo/kafka/ApplicationConfiguration.java | 27 +++++-------------- .../de/juplo/kafka/ApplicationProperties.java | 23 ---------------- src/main/resources/application.yml | 22 ++++++++------- 4 files changed, 21 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index f64266b4..0677e4d9 100644 --- a/pom.xml +++ b/pom.xml @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0090ceea..efdfafa1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,14 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ProducerFactory; import java.time.Duration; -import java.util.Properties; @Configuration @@ -18,13 +17,14 @@ public class ApplicationConfiguration { @Bean public ExampleProducer exampleProducer( + @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, Producer kafkaProducer, ConfigurableApplicationContext applicationContext) { return new ExampleProducer( - properties.getClientId(), + clientId, properties.getProducerProperties().getTopic(), properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) @@ -34,23 +34,8 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public Producer kafkaProducer(ProducerFactory producerFactory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); + return producerFactory.createProducer(); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 43232628..908072cb 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -16,13 +16,6 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull private ProducerProperties producer; @@ -41,22 +34,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Duration deliveryTimeout; - @NotNull - private Duration maxBlock; - @NotNull - private Long bufferMemory; - @NotNull - private Integer batchSize; - @NotNull - private Duration linger; - @NotNull - @NotEmpty - private String compressionType; private Duration throttle; } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea1284..008742c1 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,16 +1,20 @@ juplo: - bootstrap-server: :9092 - client-id: DEV producer: topic: test - acks: -1 - delivery-timeout: 10s - max-block: 5s - buffer-memory: 33554432 - batch-size: 16384 - linger: 0 - compression-type: gzip throttle: 500 +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + producer: + acks: -1 + buffer-memory: 33554432 + batch-size: 16384 + compression-type: gzip + properties: + delivery-timeout: 10s + max-block: 5s + linger: 0 management: endpoint: shutdown: -- 2.20.1