d20ee897346c92ce98dbeeb84791bd90ed86a945
[demos/kafka/training] / src / main / java / de / juplo / kafka / ResultRecorder.java
1 package de.juplo.kafka;
2
3 import lombok.Getter;
4 import org.apache.kafka.clients.producer.RecordMetadata;
5 import org.springframework.web.context.request.async.DeferredResult;
6
7 import java.util.Arrays;
8
9
10 class ResultRecorder
11 {
12   @Getter
13   private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
14   private final int numMessages;
15   private final RecordMetadata[] metadata;
16   private final Exception[] errors;
17
18   private int sent = 0;
19
20
21   ResultRecorder(int numMessages)
22   {
23     this.numMessages = numMessages;
24     this.metadata = new RecordMetadata[numMessages];
25     this.errors = new Exception[numMessages];
26   }
27
28
29   void addSuccess(RecordMetadata m)
30   {
31     checkStatus();
32     metadata[sent++] = m;
33     processResult();
34   }
35
36   void addFailure(Exception e)
37   {
38     checkStatus();
39     errors[sent++] = e;
40     processResult();
41   }
42
43   private void checkStatus() throws IllegalStateException
44   {
45     if (sent >= numMessages)
46       throw new IllegalStateException("Already sent " + sent + " messages!");
47   }
48
49   private void processResult()
50   {
51     if (sent == numMessages)
52     {
53       if (Arrays
54           .stream(errors)
55           .filter(e -> e != null)
56           .findAny()
57           .isPresent())
58       {
59         deferredResult.setErrorResult(new ProduceFailure(errors));
60       }
61       else
62       {
63         Integer[] partitions = new Integer[numMessages];
64         Long[] offsets = new Long[numMessages];
65         for (int i = 0; i < numMessages; i++)
66         {
67           partitions[i] = metadata[i].partition();
68           offsets[i] = metadata[i].offset();
69         }
70         deferredResult.setResult(new ProduceSuccess(partitions, offsets));
71       }
72     }
73   }
74 }