+package de.juplo.kafka;
+
+import lombok.Getter;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Arrays;
+
+
+class ResultRecorder
+{
+ @Getter
+ private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
+ private final int numMessages;
+ private final RecordMetadata[] metadata;
+ private final Exception[] errors;
+
+ private int sent = 0;
+
+
+ ResultRecorder(int numMessages)
+ {
+ this.numMessages = numMessages;
+ this.metadata = new RecordMetadata[numMessages];
+ this.errors = new Exception[numMessages];
+ }
+
+
+ void addSuccess(RecordMetadata m)
+ {
+ checkStatus();
+ metadata[sent++] = m;
+ processResult();
+ }
+
+ void addFailure(Exception e)
+ {
+ checkStatus();
+ errors[sent++] = e;
+ processResult();
+ }
+
+ private void checkStatus() throws IllegalStateException
+ {
+ if (sent >= numMessages)
+ throw new IllegalStateException("Already sent " + sent + " messages!");
+ }
+
+ private void processResult()
+ {
+ if (sent == numMessages)
+ {
+ if (Arrays
+ .stream(errors)
+ .filter(e -> e != null)
+ .findAny()
+ .isPresent())
+ {
+ deferredResult.setErrorResult(new ProduceFailure(errors));
+ }
+ else
+ {
+ Integer[] partitions = new Integer[numMessages];
+ Long[] offsets = new Long[numMessages];
+ for (int i = 0; i < numMessages; i++)
+ {
+ partitions[i] = metadata[i].partition();
+ offsets[i] = metadata[i].offset();
+ }
+ deferredResult.setResult(new ProduceSuccess(partitions, offsets));
+ }
+ }
+ }
+}