import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.HttpStatus;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
public class RestGateway
{
private final String id;
- private final String topic;
private final Integer partition;
- private final Producer<String, Integer> producer;
+ private final KafkaTemplate<String, Integer> kafkaTemplate;
private long produced = 0;
final long time = System.currentTimeMillis();
- final ProducerRecord<String, Integer> record = new ProducerRecord<>(
- topic, // Topic
- partition, // Partition - Uses default-algorithm, if null
- key, // Key
- value // Value
- );
-
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- // HANDLE SUCCESS
- produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- // HANDLE ERROR
- result.setErrorResult(new ProduceFailure(e));
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
+ ListenableFuture<SendResult<String, Integer>> future =
+ kafkaTemplate.send(null, partition, key, value);
long now = System.currentTimeMillis();
+
+ future.addCallback(
+ sendResult ->
+ {
+ // HANDLE SUCCESS
+ produced++;
+ RecordMetadata metadata = sendResult.getRecordMetadata();
+ ProducerRecord<String, Integer> record = sendResult.getProducerRecord();
+ result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ },
+ e->
+ {
+ // HANDLE ERROR
+ result.setErrorResult(new ProduceFailure(e));
+ log.error(
+ "{} - ERROR key={} latency={}ms: {}",
+ id,
+ key,
+ now - time,
+ e.toString()
+ );
+ });
+
log.trace(
"{} - Queued message with key={} latency={}ms",
id,
- record.key(),
+ key,
now - time
);