Über `?error=1` kann ein Fehler den Nachrichtenstrom eingebettet werden
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.Producer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.web.bind.annotation.*;
10 import org.springframework.web.context.request.async.DeferredResult;
11
12 import java.math.BigInteger;
13
14
15 @Slf4j
16 @RequestMapping
17 @ResponseBody
18 @RequiredArgsConstructor
19 public class RestProducer
20 {
21   private final String id;
22   private final String topic;
23   private final Integer partition;
24   private final Producer<String, Object> producer;
25
26   private long produced = 0;
27
28   @PostMapping(path = "{key}")
29   public DeferredResult<ProduceResult> send(
30       @PathVariable String key,
31       @RequestHeader(name = "X-id", required = false) Long correlationId,
32       @RequestBody Integer number,
33       @RequestParam(required = false) boolean error)
34   {
35     ResultRecorder result = new ResultRecorder(number+1);
36
37     for (int i = 1; i <= number; i++)
38     {
39       int next = error && i == (number+1)/2 ? i * -1 : i;
40       send(key, new AddNumberMessage(number, next), correlationId, result);
41     }
42     send(key, new CalculateSumMessage(number), correlationId, result);
43
44     return result.getDeferredResult();
45   }
46
47   private void send(
48       String key,
49       Object value,
50       Long correlationId,
51       ResultRecorder result)
52   {
53     final long time = System.currentTimeMillis();
54
55     final ProducerRecord<String, Object> record = new ProducerRecord<>(
56         topic,  // Topic
57         partition, // Partition
58         key,    // Key
59         value   // Value
60     );
61
62     record.headers().add("source", id.getBytes());
63     if (correlationId != null)
64     {
65       record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
66     }
67
68     producer.send(record, (metadata, e) ->
69     {
70       long now = System.currentTimeMillis();
71       if (e == null)
72       {
73         // HANDLE SUCCESS
74         result.addSuccess(metadata);
75         produced++;
76         log.debug(
77             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
78             id,
79             record.key(),
80             record.value(),
81             metadata.partition(),
82             metadata.offset(),
83             metadata.timestamp(),
84             now - time
85         );
86       }
87       else
88       {
89         // HANDLE ERROR
90         result.addFailure(e);
91         log.error(
92             "{} - ERROR key={} timestamp={} latency={}ms: {}",
93             id,
94             record.key(),
95             metadata == null ? -1 : metadata.timestamp(),
96             now - time,
97             e.toString()
98         );
99       }
100     });
101
102     long now = System.currentTimeMillis();
103     log.trace(
104         "{} - Queued message with key={} latency={}ms",
105         id,
106         record.key(),
107         now - time
108     );
109   }
110
111   @ExceptionHandler
112   @ResponseStatus(HttpStatus.BAD_REQUEST)
113   public ErrorResponse illegalStateException(IllegalStateException e)
114   {
115     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
116   }
117 }