1 package de.juplo.kafka;
4 import org.apache.kafka.clients.producer.RecordMetadata;
5 import org.springframework.web.context.request.async.DeferredResult;
7 import java.util.Arrays;
13 private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
14 private final int numMessages;
15 private final RecordMetadata[] metadata;
16 private final Exception[] errors;
21 ResultRecorder(int numMessages)
23 this.numMessages = numMessages;
24 this.metadata = new RecordMetadata[numMessages];
25 this.errors = new Exception[numMessages];
29 void addSuccess(RecordMetadata m)
36 void addFailure(Exception e)
43 private void checkStatus() throws IllegalStateException
45 if (sent >= numMessages)
46 throw new IllegalStateException("Already sent " + sent + " messages!");
49 private void processResult()
51 if (sent == numMessages)
55 .filter(e -> e != null)
59 deferredResult.setErrorResult(new ProduceFailure(errors));
63 Integer[] partitions = new Integer[numMessages];
64 Long[] offsets = new Long[numMessages];
65 for (int i = 0; i < numMessages; i++)
67 partitions[i] = metadata[i].partition();
68 offsets[i] = metadata[i].offset();
70 deferredResult.setResult(new ProduceSuccess(partitions, offsets));