Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
index debe366..4be2dcd 100644 (file)
@@ -20,7 +20,7 @@ public class RestProducer
   private final String id;
   private final String topic;
   private final Integer partition;
-  private final KafkaProducer<String, String> producer;
+  private final KafkaProducer<String, Object> producer;
 
   private long produced = 0;
 
@@ -28,13 +28,28 @@ public class RestProducer
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
       @RequestHeader(name = "X-id", required = false) Long correlationId,
-      @RequestBody String value)
+      @RequestBody Integer number)
   {
-    DeferredResult<ProduceResult> result = new DeferredResult<>();
+    ResultRecorder result = new ResultRecorder(number+1);
 
+    for (int i = 1; i <= number; i++)
+    {
+      send(key, new AddNumberMessage(number, i), correlationId, result);
+    }
+    send(key, new CalculateSumMessage(number), correlationId, result);
+
+    return result.getDeferredResult();
+  }
+
+  private void send(
+      String key,
+      Object value,
+      Long correlationId,
+      ResultRecorder result)
+  {
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
+    final ProducerRecord<String, Object> record = new ProducerRecord<>(
         topic,  // Topic
         partition, // Partition
         key,    // Key
@@ -53,8 +68,8 @@ public class RestProducer
       if (e == null)
       {
         // HANDLE SUCCESS
+        result.addSuccess(metadata);
         produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
             id,
@@ -69,7 +84,7 @@ public class RestProducer
       else
       {
         // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
+        result.addFailure(e);
         log.error(
             "{} - ERROR key={} timestamp={} latency={}ms: {}",
             id,
@@ -88,8 +103,6 @@ public class RestProducer
         record.key(),
         now - time
     );
-
-    return result;
   }
 
   @ExceptionHandler