X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessProducer.java;h=30fefab1dcaff70ace4bf61d5a9d44b826f9555d;hb=3ccb6d362b851e839244811e5fe34e4bf51b4586;hp=7a5b324e144fddbaef84c036640e62448c6f7a4f;hpb=26cef68053cc7472055344b171a44f34e7592ebb;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java index 7a5b324..30fefab 100644 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -1,12 +1,13 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; import javax.annotation.PreDestroy; -import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -16,9 +17,8 @@ public class EndlessProducer implements Runnable { private final ExecutorService executor; private final String id; - private final String topic; private final int throttleMs; - private final KafkaProducer producer; + private final KafkaTemplate kafkaTemplate; private boolean running = false; private long i = 0; @@ -26,25 +26,14 @@ public class EndlessProducer implements Runnable public EndlessProducer( ExecutorService executor, - String bootstrapServer, String clientId, - String topic, - String acks, - int throttleMs) + int throttleMs, + KafkaTemplate kafkaTemplate) { this.executor = executor; this.id = clientId; - this.topic = topic; this.throttleMs = throttleMs; - - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("client.id", clientId); - props.put("acks", acks); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); + this.kafkaTemplate = kafkaTemplate; } @Override @@ -89,17 +78,14 @@ public class EndlessProducer implements Runnable { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) + ListenableFuture> listenableFuture = kafkaTemplate.sendDefault(key, value); + listenableFuture.addCallback( + result -> { + long now = System.currentTimeMillis(); + RecordMetadata metadata = result.getRecordMetadata(); + ProducerRecord record = result.getProducerRecord(); + // HANDLE SUCCESS produced++; log.debug( @@ -112,27 +98,27 @@ public class EndlessProducer implements Runnable metadata.timestamp(), now - time ); - } - else + }, + e -> { + long now = System.currentTimeMillis(); + // HANDLE ERROR log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", + "{} - ERROR key={} latency={}ms: {}", id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), + key, now - time, e.toString() ); - } - }); + }); long now = System.currentTimeMillis(); log.trace( "{} - Queued #{} key={} latency={}ms", id, value, - record.key(), + key, now - time ); } @@ -170,8 +156,6 @@ public class EndlessProducer implements Runnable } finally { - log.info("{} - Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } }