From 3ccb6d362b851e839244811e5fe34e4bf51b4586 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Oct 2023 16:24:47 +0200 Subject: [PATCH] Spring-Version des Endless-Stream-Producer's --- README.sh | 2 +- docker-compose.yml | 8 +-- pom.xml | 8 +-- src/main/java/de/juplo/kafka/Application.java | 23 +++---- .../de/juplo/kafka/ApplicationProperties.java | 4 -- .../java/de/juplo/kafka/EndlessProducer.java | 62 +++++++------------ src/main/resources/application.yml | 21 ++++--- 7 files changed, 54 insertions(+), 74 deletions(-) diff --git a/README.sh b/README.sh index 73ceebc..2742603 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/endless-stream-spring-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker-compose.yml b/docker-compose.yml index a368379..d0c3740 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,12 +37,12 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/endless-stream-spring-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer - producer.topic: test + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: producer + spring.kafka.template.default-topic: test producer.throttle-ms: 200 diff --git a/pom.xml b/pom.xml index af4a89f..7829fd3 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + endless-stream-spring-producer + Endless Producer: a Spring Producer that endlessly writes numbers into a topic 1.0-SNAPSHOT @@ -31,8 +31,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index bc617a8..0719459 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,10 +1,11 @@ package de.juplo.kafka; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.util.Assert; import java.util.concurrent.Executors; @@ -14,25 +15,19 @@ import java.util.concurrent.Executors; @EnableConfigurationProperties(ApplicationProperties.class) public class Application { - @Autowired - ApplicationProperties properties; - - @Bean - public EndlessProducer producer() + public EndlessProducer producer( + ApplicationProperties properties, + @Value("${spring.kafka.client-id:DEV}") String clientId, + KafkaTemplate kafkaTemplate) { - Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set"); - Assert.hasText(properties.getClientId(), "producer.client-id must be set"); - Assert.hasText(properties.getTopic(), "producer.topic must be set"); EndlessProducer producer = new EndlessProducer( Executors.newFixedThreadPool(1), - properties.getBootstrapServer(), - properties.getClientId(), - properties.getTopic(), - properties.getAcks(), - properties.getThrottleMs()); + clientId, + properties.getThrottleMs(), + kafkaTemplate); producer.start(); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index ab26890..22b506e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -9,9 +9,5 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @Setter public class ApplicationProperties { - private String bootstrapServer; - private String clientId; - private String topic; - private String acks; private int throttleMs; } diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 7a5b324..30fefab 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -1,12 +1,13 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; import javax.annotation.PreDestroy; -import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -16,9 +17,8 @@ public class EndlessProducer implements Runnable { private final ExecutorService executor; private final String id; - private final String topic; private final int throttleMs; - private final KafkaProducer producer; + private final KafkaTemplate kafkaTemplate; private boolean running = false; private long i = 0; @@ -26,25 +26,14 @@ public class EndlessProducer implements Runnable public EndlessProducer( ExecutorService executor, - String bootstrapServer, String clientId, - String topic, - String acks, - int throttleMs) + int throttleMs, + KafkaTemplate kafkaTemplate) { this.executor = executor; this.id = clientId; - this.topic = topic; this.throttleMs = throttleMs; - - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("client.id", clientId); - props.put("acks", acks); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); + this.kafkaTemplate = kafkaTemplate; } @Override @@ -89,17 +78,14 @@ public class EndlessProducer implements Runnable { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) + ListenableFuture> listenableFuture = kafkaTemplate.sendDefault(key, value); + listenableFuture.addCallback( + result -> { + long now = System.currentTimeMillis(); + RecordMetadata metadata = result.getRecordMetadata(); + ProducerRecord record = result.getProducerRecord(); + // HANDLE SUCCESS produced++; log.debug( @@ -112,27 +98,27 @@ public class EndlessProducer implements Runnable metadata.timestamp(), now - time ); - } - else + }, + e -> { + long now = System.currentTimeMillis(); + // HANDLE ERROR log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", + "{} - ERROR key={} latency={}ms: {}", id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), + key, now - time, e.toString() ); - } - }); + }); long now = System.currentTimeMillis(); log.trace( "{} - Queued #{} key={} latency={}ms", id, value, - record.key(), + key, now - time ); } @@ -170,8 +156,6 @@ public class EndlessProducer implements Runnable } finally { - log.info("{} - Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7dd385b..b95d6b5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,8 +1,4 @@ producer: - bootstrap-server: :9092 - client-id: DEV - topic: test - acks: 1 throttle-ms: 1000 management: endpoint: @@ -19,11 +15,20 @@ management: enabled: true info: kafka: - bootstrap-server: ${producer.bootstrap-server} - client-id: ${producer.client-id} - topic: ${producer.topic} - acks: ${producer.acks} + bootstrap-servers: ${spring.kafka.bootstrap-servers} + client-id: ${spring.kafka.client-id} + topic: ${spring.kafka.template.default-topic} + acks: ${spring.kafka.producer.acks} throttle-ms: ${producer.throttle-ms} + batch-size: ${spring.kafka.producer.batch-size} + linger-ms: ${spring.kafka.producer.properties.linger.ms} + compression-type: ${spring.kafka.producer.compression-type} +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV + template: + default-topic: test logging: level: root: INFO -- 2.20.1