X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FRestGateway.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FRestGateway.java;h=2f2b18c6448232309ad70da377b94667909743df;hb=d0de5c9030e575251d62d2a207ede84cd881ccec;hp=53a87df7fc044802ad7045e4fdfb8debce91b057;hpb=f348b232d39c0b03f21a4d1e083fc6f2b44468f0;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/RestGateway.java b/src/main/java/de/juplo/kafka/RestGateway.java index 53a87df..2f2b18c 100644 --- a/src/main/java/de/juplo/kafka/RestGateway.java +++ b/src/main/java/de/juplo/kafka/RestGateway.java @@ -2,9 +2,12 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; @@ -16,9 +19,8 @@ import org.springframework.web.context.request.async.DeferredResult; public class RestGateway { private final String id; - private final String topic; private final Integer partition; - private final Producer producer; + private final KafkaTemplate kafkaTemplate; private long produced = 0; @@ -32,52 +34,47 @@ public class RestGateway final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - partition, // Partition - Uses default-algorithm, if null - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); + ListenableFuture> future = + kafkaTemplate.send(null, partition, key, value); long now = System.currentTimeMillis(); + + future.addCallback( + sendResult -> + { + // HANDLE SUCCESS + produced++; + RecordMetadata metadata = sendResult.getRecordMetadata(); + ProducerRecord record = sendResult.getProducerRecord(); + result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + }, + e-> + { + // HANDLE ERROR + result.setErrorResult(new ProduceFailure(e)); + log.error( + "{} - ERROR key={} latency={}ms: {}", + id, + key, + now - time, + e.toString() + ); + }); + log.trace( "{} - Queued message with key={} latency={}ms", id, - record.key(), + key, now - time );