X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FRestProducer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FRestProducer.java;h=56758f8151dc662c597387fbec599048ddb6aa56;hb=9b2b80f42860bcc954edf72cae931e9ba9f2e4c5;hp=56f3382415e4d130e03c69d2e7a1e8ad8818c119;hpb=96013a5bc40a65eb35713bccc756eea03c4f3de7;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 56f3382..56758f8 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -1,7 +1,6 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; import org.springframework.kafka.core.KafkaTemplate; @@ -37,26 +36,14 @@ public class RestProducer @RequestBody String value) { key = key.trim(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - new ClientMessage(key, value) // Value - ); - - return send(record); + return send(key, new ClientMessage(key, value)); } @PutMapping(path = "{key}") public DeferredResult message(@PathVariable String key) { key = key.trim(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - new FooMessage(key, System.currentTimeMillis()) // Value - ); - - return send(record); + return send(key, new FooMessage(key, System.currentTimeMillis())); } @PostMapping(path = "/") @@ -64,22 +51,16 @@ public class RestProducer @RequestBody String name) { name = name.trim(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - name, // Key - new Greeting(name) // Value - ); - - return send(record); + return send(name, new Greeting(name)); } - private DeferredResult send(ProducerRecord record) + private DeferredResult send(String key, Object value) { DeferredResult result = new DeferredResult<>(); final long time = System.currentTimeMillis(); - kafkaTemplate.send(record).addCallback( + kafkaTemplate.send(topic, key, value).addCallback( (sendResult) -> { long now = System.currentTimeMillis(); @@ -91,8 +72,8 @@ public class RestProducer log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, - record.key(), - record.value(), + key, + value, metadata.partition(), metadata.offset(), metadata.timestamp(), @@ -108,7 +89,7 @@ public class RestProducer log.error( "{} - ERROR key={} timestamp=-1 latency={}ms: {}", id, - record.key(), + key, now - time, e.toString() ); @@ -118,7 +99,7 @@ public class RestProducer log.trace( "{} - Queued key={} latency={}ms", id, - record.key(), + key, now - time );