From 1a34f483982e29c4a5efd6da92dcbd70f151f27b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Jan 2025 16:44:08 +0100 Subject: [PATCH] `ExampleProducer` auf das `KafkaTemplate` umgestellt --- README.sh | 2 +- build.gradle | 5 ++- docker/docker-compose.yml | 6 ++-- pom.xml | 13 +++---- .../juplo/kafka/ApplicationConfiguration.java | 34 ++++--------------- .../de/juplo/kafka/ApplicationProperties.java | 23 ------------- .../java/de/juplo/kafka/ExampleProducer.java | 23 +++++-------- src/main/resources/application.yml | 31 +++++++---------- .../java/de/juplo/kafka/ApplicationTests.java | 2 -- 9 files changed, 38 insertions(+), 101 deletions(-) diff --git a/README.sh b/README.sh index c8a0b221..501349a9 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-kafkatemplate-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 1429c4dd..ca6d64dd 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.0-SNAPSHOT' +version = '1.0-kafkatemplate-SNAPSHOT' java { toolchain { @@ -27,7 +27,7 @@ repositories { } dependencies { - implementation 'org.apache.kafka:kafka-clients' + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' @@ -36,7 +36,6 @@ dependencies { annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 00f68fcc..2bb942bf 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,10 +136,10 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-kafkatemplate-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: producer + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: producer juplo.producer.topic: test consumer: diff --git a/pom.xml b/pom.xml index f64266b4..27951434 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ de.juplo.kafka spring-producer Spring Producer - A Simple Producer, based on Spring Boot, that sends messages via Kafka - 1.0-SNAPSHOT + A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka + 1.0-kafkatemplate-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0090ceea..4a5c8da2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; import java.time.Duration; -import java.util.Properties; @Configuration @@ -19,38 +17,18 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, - Producer kafkaProducer, + KafkaProperties kafkaProperties, + KafkaTemplate kafkaTemplate, ConfigurableApplicationContext applicationContext) { return new ExampleProducer( - properties.getClientId(), + kafkaProperties.getClientId(), properties.getProducerProperties().getTopic(), properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), - kafkaProducer, + kafkaTemplate, () -> applicationContext.close()); } - - @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 43232628..908072cb 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -16,13 +16,6 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull private ProducerProperties producer; @@ -41,22 +34,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Duration deliveryTimeout; - @NotNull - private Duration maxBlock; - @NotNull - private Long bufferMemory; - @NotNull - private Integer batchSize; - @NotNull - private Duration linger; - @NotNull - @NotEmpty - private String compressionType; private Duration throttle; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 25e885d9..e71d25af 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,10 +1,12 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import java.time.Duration; +import java.util.concurrent.CompletableFuture; @Slf4j @@ -13,7 +15,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final KafkaTemplate kafkaTemplate; private final Thread workerThread; private final Runnable closeCallback; @@ -25,13 +27,13 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + KafkaTemplate kafkaTemplate, Runnable closeCallback) { this.id = id; this.topic = topic; this.throttle = throttle; - this.producer = producer; + this.kafkaTemplate = kafkaTemplate; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); workerThread.start(); @@ -72,8 +74,6 @@ public class ExampleProducer implements Runnable } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } @@ -82,18 +82,13 @@ public class ExampleProducer implements Runnable { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> + kafkaTemplate.send(topic, key, value).whenComplete((result, e) -> { long now = System.currentTimeMillis(); if (e == null) { // HANDLE SUCCESS + RecordMetadata metadata = result.getRecordMetadata(); produced++; log.debug( "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea1284..f5adc122 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,16 +1,20 @@ juplo: - bootstrap-server: :9092 - client-id: DEV producer: topic: test - acks: -1 - delivery-timeout: 10s - max-block: 5s - buffer-memory: 33554432 - batch-size: 16384 - linger: 0 - compression-type: gzip throttle: 500 +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + producer: + acks: -1 + buffer-memory: 33554432 + batch-size: 16384 + compression-type: gzip + properties: + delivery-timeout: 10s + max-block: 5s + linger: 0 management: endpoint: shutdown: @@ -26,17 +30,8 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} - acks: ${juplo.producer.acks} - delivery-timeout: ${juplo.producer.delivery-timeout} - max-block: ${juplo.producer.max-block} - buffer-memory: ${juplo.producer.buffer-memory} - batch-size: ${juplo.producer.batch-size} - linger: ${juplo.producer.linger} - compression-type: ${juplo.producer.compression-type} throttle: ${juplo.producer.throttle} logging: level: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 29ca80a7..7687e9ca 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,9 +27,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.producer.topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -- 2.20.1