From: Kai Moritz Date: Sat, 18 Jan 2025 15:44:08 +0000 (+0100) Subject: `ExampleProducer` auf das `KafkaTemplate` umgestellt X-Git-Tag: spring/spring-producer--kafkatemplate--2025-04-signal-spickzettel X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fspring%2Fspring-producer--kafkatemplate;p=demos%2Fkafka%2Ftraining `ExampleProducer` auf das `KafkaTemplate` umgestellt --- diff --git a/README.sh b/README.sh index 77fc3743..b05f08c6 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:2.0-SNAPSHOT +IMAGE=juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 1429c4dd..ca6d64dd 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.0-SNAPSHOT' +version = '1.0-kafkatemplate-SNAPSHOT' java { toolchain { @@ -27,7 +27,7 @@ repositories { } dependencies { - implementation 'org.apache.kafka:kafka-clients' + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' @@ -36,7 +36,6 @@ dependencies { annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 59e7ed57..ee3e7258 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,7 +136,7 @@ services: - kafka-3 producer: - image: juplo/spring-producer:2.0-SNAPSHOT + image: juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: producer diff --git a/pom.xml b/pom.xml index be7dea5d..34a3f237 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ de.juplo.kafka spring-producer Spring Producer - A Simple Producer, based on Spring Boot, that sends messages via Kafka - 2.0-SNAPSHOT + A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka + 2.0-kafkatemplate-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index efdfafa1..f7ad6594 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,12 +1,11 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.Producer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; import java.time.Duration; @@ -19,7 +18,7 @@ public class ApplicationConfiguration public ExampleProducer exampleProducer( @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, - Producer kafkaProducer, + KafkaTemplate kafkaTemplate, ConfigurableApplicationContext applicationContext) { return @@ -29,13 +28,7 @@ public class ApplicationConfiguration properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), - kafkaProducer, + kafkaTemplate, () -> applicationContext.close()); } - - @Bean(destroyMethod = "") - public Producer kafkaProducer(ProducerFactory producerFactory) - { - return producerFactory.createProducer(); - } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index c5a5a80d..64d0b51e 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,8 +1,8 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.core.KafkaTemplate; import java.time.Duration; @@ -13,7 +13,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final KafkaTemplate kafkaTemplate; private final Thread workerThread; private final Runnable closeCallback; @@ -25,13 +25,13 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + KafkaTemplate kafkaTemplate, Runnable closeCallback) { this.id = id; this.topic = topic; this.throttle = throttle; - this.producer = producer; + this.kafkaTemplate = kafkaTemplate; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); workerThread.start(); @@ -72,8 +72,6 @@ public class ExampleProducer implements Runnable } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } @@ -82,18 +80,13 @@ public class ExampleProducer implements Runnable { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> + kafkaTemplate.send(topic, key, value).whenComplete((result, e) -> { long now = System.currentTimeMillis(); if (e == null) { // HANDLE SUCCESS + RecordMetadata metadata = result.getRecordMetadata(); log.debug( "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", id, diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 3a4493ec..7687e9ca 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,7 +27,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", "juplo.producer.topic=" + TOPIC}) @AutoConfigureMockMvc