From f8f517eca3532d450e81dcccef224ed43c208930 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Jan 2025 16:44:08 +0100 Subject: [PATCH] `ExampleProducer` auf das `KafkaTemplate` umgestellt --- README.sh | 2 +- docker/docker-compose.yml | 6 +-- pom.xml | 15 +++----- .../juplo/kafka/ApplicationConfiguration.java | 34 +++-------------- .../de/juplo/kafka/ApplicationProperties.java | 23 ------------ .../java/de/juplo/kafka/ExampleProducer.java | 37 +++++++++---------- src/main/resources/application.yml | 31 +++++++--------- .../java/de/juplo/kafka/ApplicationTests.java | 2 - 8 files changed, 45 insertions(+), 105 deletions(-) diff --git a/README.sh b/README.sh index 61526553..21866c4c 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/kafka-template:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 00f68fcc..b8aaaa8b 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,10 +136,10 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/kafka-template:1.0-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: producer + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: producer juplo.producer.topic: test consumer: diff --git a/pom.xml b/pom.xml index 23889513..861559ce 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - spring-producer - Spring Producer - A Simple Producer, based on Spring Boot, that sends messages via Kafka + kafka-template + Kafka Template + A Simple Producer, based on the KafkaTemplate, that sends messages via Kafka 1.0-SNAPSHOT @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -52,11 +52,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7540dd33..7fa97ef4 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; import java.time.Duration; -import java.util.Properties; @Configuration @@ -19,38 +17,18 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, - Producer kafkaProducer, + KafkaProperties kafkaProperties, + KafkaTemplate kafkaTemplate, ConfigurableApplicationContext applicationContext) { return new ExampleProducer( - properties.getClientId(), + kafkaProperties.getClientId(), properties.getProducerProperties().getTopic(), properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), - kafkaProducer, + kafkaTemplate, () -> applicationContext.close()); } - - @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 43232628..908072cb 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -16,13 +16,6 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - @NotNull private ProducerProperties producer; @@ -41,22 +34,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Duration deliveryTimeout; - @NotNull - private Duration maxBlock; - @NotNull - private Long bufferMemory; - @NotNull - private Integer batchSize; - @NotNull - private Duration linger; - @NotNull - @NotEmpty - private String compressionType; private Duration throttle; } } diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 3fcd05d1..864eed2a 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,10 +1,12 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import java.time.Duration; +import java.util.concurrent.CompletableFuture; @Slf4j @@ -13,7 +15,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final KafkaTemplate kafkaTemplate; private final Thread workerThread; private final Runnable closeCallback; @@ -25,13 +27,13 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + KafkaTemplate kafkaTemplate, Runnable closeCallback) { this.id = id; this.topic = topic; this.throttle = throttle; - this.producer = producer; + this.kafkaTemplate = kafkaTemplate; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); workerThread.start(); @@ -72,8 +74,6 @@ public class ExampleProducer implements Runnable } finally { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } @@ -82,18 +82,13 @@ public class ExampleProducer implements Runnable { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); + CompletableFuture> completableFuture = kafkaTemplate.send(topic, key, value); - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) + completableFuture.thenAccept(result -> { // HANDLE SUCCESS + long now = System.currentTimeMillis(); + RecordMetadata metadata = result.getRecordMetadata(); produced++; log.debug( "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", @@ -105,10 +100,12 @@ public class ExampleProducer implements Runnable metadata.timestamp(), now - time ); - } - else + }); + + completableFuture.exceptionally(e -> { // HANDLE ERROR + long now = System.currentTimeMillis(); log.error( "{} - ERROR for message {}={}, latency={}ms: {}", id, @@ -117,8 +114,8 @@ public class ExampleProducer implements Runnable now - time, e.toString() ); - } - }); + return null; + }); long now = System.currentTimeMillis(); log.trace( diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea1284..f5adc122 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,16 +1,20 @@ juplo: - bootstrap-server: :9092 - client-id: DEV producer: topic: test - acks: -1 - delivery-timeout: 10s - max-block: 5s - buffer-memory: 33554432 - batch-size: 16384 - linger: 0 - compression-type: gzip throttle: 500 +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + producer: + acks: -1 + buffer-memory: 33554432 + batch-size: 16384 + compression-type: gzip + properties: + delivery-timeout: 10s + max-block: 5s + linger: 0 management: endpoint: shutdown: @@ -26,17 +30,8 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} - acks: ${juplo.producer.acks} - delivery-timeout: ${juplo.producer.delivery-timeout} - max-block: ${juplo.producer.max-block} - buffer-memory: ${juplo.producer.buffer-memory} - batch-size: ${juplo.producer.batch-size} - linger: ${juplo.producer.linger} - compression-type: ${juplo.producer.compression-type} throttle: ${juplo.producer.throttle} logging: level: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fe8609ec..2c8b34bf 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,9 +27,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.producer.topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -- 2.20.1