X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FResultRecorder.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FResultRecorder.java;h=d20ee897346c92ce98dbeeb84791bd90ed86a945;hb=6369c42cebd818fda8c518813443eb907629ce41;hp=0000000000000000000000000000000000000000;hpb=da2a54acdaa2e7ad6e77e93f111cf27603617d23;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ResultRecorder.java b/src/main/java/de/juplo/kafka/ResultRecorder.java new file mode 100644 index 0000000..d20ee89 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ResultRecorder.java @@ -0,0 +1,74 @@ +package de.juplo.kafka; + +import lombok.Getter; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.Arrays; + + +class ResultRecorder +{ + @Getter + private final DeferredResult deferredResult = new DeferredResult(); + private final int numMessages; + private final RecordMetadata[] metadata; + private final Exception[] errors; + + private int sent = 0; + + + ResultRecorder(int numMessages) + { + this.numMessages = numMessages; + this.metadata = new RecordMetadata[numMessages]; + this.errors = new Exception[numMessages]; + } + + + void addSuccess(RecordMetadata m) + { + checkStatus(); + metadata[sent++] = m; + processResult(); + } + + void addFailure(Exception e) + { + checkStatus(); + errors[sent++] = e; + processResult(); + } + + private void checkStatus() throws IllegalStateException + { + if (sent >= numMessages) + throw new IllegalStateException("Already sent " + sent + " messages!"); + } + + private void processResult() + { + if (sent == numMessages) + { + if (Arrays + .stream(errors) + .filter(e -> e != null) + .findAny() + .isPresent()) + { + deferredResult.setErrorResult(new ProduceFailure(errors)); + } + else + { + Integer[] partitions = new Integer[numMessages]; + Long[] offsets = new Long[numMessages]; + for (int i = 0; i < numMessages; i++) + { + partitions[i] = metadata[i].partition(); + offsets[i] = metadata[i].offset(); + } + deferredResult.setResult(new ProduceSuccess(partitions, offsets)); + } + } + } +}