Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet
[demos/kafka/training] / src / main / java / de / juplo / kafka / ResultRecorder.java
diff --git a/src/main/java/de/juplo/kafka/ResultRecorder.java b/src/main/java/de/juplo/kafka/ResultRecorder.java
new file mode 100644 (file)
index 0000000..d20ee89
--- /dev/null
@@ -0,0 +1,74 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Arrays;
+
+
+class ResultRecorder
+{
+  @Getter
+  private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
+  private final int numMessages;
+  private final RecordMetadata[] metadata;
+  private final Exception[] errors;
+
+  private int sent = 0;
+
+
+  ResultRecorder(int numMessages)
+  {
+    this.numMessages = numMessages;
+    this.metadata = new RecordMetadata[numMessages];
+    this.errors = new Exception[numMessages];
+  }
+
+
+  void addSuccess(RecordMetadata m)
+  {
+    checkStatus();
+    metadata[sent++] = m;
+    processResult();
+  }
+
+  void addFailure(Exception e)
+  {
+    checkStatus();
+    errors[sent++] = e;
+    processResult();
+  }
+
+  private void checkStatus() throws IllegalStateException
+  {
+    if (sent >= numMessages)
+      throw new IllegalStateException("Already sent " + sent + " messages!");
+  }
+
+  private void processResult()
+  {
+    if (sent == numMessages)
+    {
+      if (Arrays
+          .stream(errors)
+          .filter(e -> e != null)
+          .findAny()
+          .isPresent())
+      {
+        deferredResult.setErrorResult(new ProduceFailure(errors));
+      }
+      else
+      {
+        Integer[] partitions = new Integer[numMessages];
+        Long[] offsets = new Long[numMessages];
+        for (int i = 0; i < numMessages; i++)
+        {
+          partitions[i] = metadata[i].partition();
+          offsets[i] = metadata[i].offset();
+        }
+        deferredResult.setResult(new ProduceSuccess(partitions, offsets));
+      }
+    }
+  }
+}