X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FRestProducer.java;h=423a8a3cdbec74975e33fb69c16b9d8b94d04b64;hb=56bf19f4f150e7ab97eed32f01a1f470b9f896a6;hp=56f3382415e4d130e03c69d2e7a1e8ad8818c119;hpb=96013a5bc40a65eb35713bccc756eea03c4f3de7;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 56f3382..423a8a3 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -1,7 +1,6 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; import org.springframework.kafka.core.KafkaTemplate; @@ -17,7 +16,6 @@ import java.util.concurrent.ExecutionException; public class RestProducer { private final String id; - private final String topic; private final KafkaTemplate kafkaTemplate; private long produced = 0; @@ -27,7 +25,6 @@ public class RestProducer KafkaTemplate kafkaTemplate) { this.id = properties.getClientId(); - this.topic = properties.getTopic(); this.kafkaTemplate = kafkaTemplate; } @@ -37,26 +34,14 @@ public class RestProducer @RequestBody String value) { key = key.trim(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - new ClientMessage(key, value) // Value - ); - - return send(record); + return send(key, new ClientMessage(key, value)); } @PutMapping(path = "{key}") public DeferredResult message(@PathVariable String key) { key = key.trim(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - new FooMessage(key, System.currentTimeMillis()) // Value - ); - - return send(record); + return send(key, new FooMessage(key, System.currentTimeMillis())); } @PostMapping(path = "/") @@ -64,22 +49,16 @@ public class RestProducer @RequestBody String name) { name = name.trim(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - name, // Key - new Greeting(name) // Value - ); - - return send(record); + return send(name, new Greeting(name)); } - private DeferredResult send(ProducerRecord record) + private DeferredResult send(String key, Object value) { DeferredResult result = new DeferredResult<>(); final long time = System.currentTimeMillis(); - kafkaTemplate.send(record).addCallback( + kafkaTemplate.sendDefault(key, value).addCallback( (sendResult) -> { long now = System.currentTimeMillis(); @@ -91,8 +70,8 @@ public class RestProducer log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, - record.key(), - record.value(), + key, + value, metadata.partition(), metadata.offset(), metadata.timestamp(), @@ -108,7 +87,7 @@ public class RestProducer log.error( "{} - ERROR key={} timestamp=-1 latency={}ms: {}", id, - record.key(), + key, now - time, e.toString() ); @@ -118,7 +97,7 @@ public class RestProducer log.trace( "{} - Queued key={} latency={}ms", id, - record.key(), + key, now - time );