WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestGateway.java
index 53a87df..2f2b18c 100644 (file)
@@ -2,9 +2,12 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.http.HttpStatus;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
@@ -16,9 +19,8 @@ import org.springframework.web.context.request.async.DeferredResult;
 public class RestGateway
 {
   private final String id;
-  private final String topic;
   private final Integer partition;
-  private final Producer<String, Integer> producer;
+  private final KafkaTemplate<String, Integer> kafkaTemplate;
 
   private long produced = 0;
 
@@ -32,52 +34,47 @@ public class RestGateway
 
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, Integer> record = new ProducerRecord<>(
-        topic,  // Topic
-        partition, // Partition - Uses default-algorithm, if null
-        key,    // Key
-        value   // Value
-    );
-
-    producer.send(record, (metadata, e) ->
-    {
-      long now = System.currentTimeMillis();
-      if (e == null)
-      {
-        // HANDLE SUCCESS
-        produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
-        log.debug(
-            "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
-            id,
-            record.key(),
-            record.value(),
-            metadata.partition(),
-            metadata.offset(),
-            metadata.timestamp(),
-            now - time
-        );
-      }
-      else
-      {
-        // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
-        log.error(
-            "{} - ERROR key={} timestamp={} latency={}ms: {}",
-            id,
-            record.key(),
-            metadata == null ? -1 : metadata.timestamp(),
-            now - time,
-            e.toString()
-        );
-      }
-    });
+    ListenableFuture<SendResult<String, Integer>> future =
+        kafkaTemplate.send(null, partition, key, value);
 
     long now = System.currentTimeMillis();
+
+    future.addCallback(
+        sendResult ->
+        {
+          // HANDLE SUCCESS
+          produced++;
+          RecordMetadata metadata = sendResult.getRecordMetadata();
+          ProducerRecord<String, Integer> record = sendResult.getProducerRecord();
+          result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+          log.debug(
+              "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+              id,
+              record.key(),
+              record.value(),
+              metadata.partition(),
+              metadata.offset(),
+              metadata.timestamp(),
+              now - time
+          );
+        },
+        e->
+        {
+          // HANDLE ERROR
+          result.setErrorResult(new ProduceFailure(e));
+          log.error(
+              "{} - ERROR key={} latency={}ms: {}",
+              id,
+              key,
+              now - time,
+              e.toString()
+          );
+        });
+
     log.trace(
         "{} - Queued message with key={} latency={}ms",
         id,
-        record.key(),
+        key,
         now - time
     );