From: Kai Moritz Date: Tue, 18 Mar 2025 15:48:45 +0000 (+0100) Subject: Umbau auf Autoconfig von Spring Kafka X-Git-Tag: spring/spring-producer--2025-04-signal-spickzettel~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ea6a78f715278252c80b104a0986d0985adf2220;p=demos%2Fkafka%2Ftraining Umbau auf Autoconfig von Spring Kafka --- diff --git a/README.sh b/README.sh index c8a0b22..77fc374 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:2.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 00f68fc..59e7ed5 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,10 +136,10 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:2.0-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: producer + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: producer juplo.producer.topic: test consumer: diff --git a/pom.xml b/pom.xml index f64266b..be7dea5 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Producer, based on Spring Boot, that sends messages via Kafka - 1.0-SNAPSHOT + 2.0-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0090cee..efdfafa 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,14 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ProducerFactory; import java.time.Duration; -import java.util.Properties; @Configuration @@ -18,13 +17,14 @@ public class ApplicationConfiguration { @Bean public ExampleProducer exampleProducer( + @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, Producer kafkaProducer, ConfigurableApplicationContext applicationContext) { return new ExampleProducer( - properties.getClientId(), + clientId, properties.getProducerProperties().getTopic(), properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) @@ -34,23 +34,8 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public Producer kafkaProducer(ProducerFactory producerFactory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); + return producerFactory.createProducer(); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 4323262..908072c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -16,13 +16,6 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull private ProducerProperties producer; @@ -41,22 +34,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Duration deliveryTimeout; - @NotNull - private Duration maxBlock; - @NotNull - private Long bufferMemory; - @NotNull - private Integer batchSize; - @NotNull - private Duration linger; - @NotNull - @NotEmpty - private String compressionType; private Duration throttle; } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 21fef28..3f6c233 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,16 +1,22 @@ juplo: - bootstrap-server: :9092 - client-id: DEV producer: topic: test - acks: -1 - delivery-timeout: 10s - max-block: 5s - buffer-memory: 33554432 - batch-size: 16384 - linger: 0 - compression-type: gzip throttle: 500ms +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + producer: + acks: -1 + buffer-memory: 33554432 + batch-size: 16384 + compression-type: gzip + properties: + metadata.max.age.ms: 5000 + request.timeout.ms: 5000 + delivery.timeout.ms: 10000 + max.block.ms: 5000 + linger.ms: 0 management: endpoint: shutdown: @@ -26,17 +32,8 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} - acks: ${juplo.producer.acks} - delivery-timeout: ${juplo.producer.delivery-timeout} - max-block: ${juplo.producer.max-block} - buffer-memory: ${juplo.producer.buffer-memory} - batch-size: ${juplo.producer.batch-size} - linger: ${juplo.producer.linger} - compression-type: ${juplo.producer.compression-type} throttle: ${juplo.producer.throttle} logging: level: