Über `?error=1` kann ein Fehler den Nachrichtenstrom eingebettet werden
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.web.bind.annotation.*;
9 import org.springframework.web.context.request.async.DeferredResult;
10
11 import java.math.BigInteger;
12
13
14 @Slf4j
15 @RequestMapping
16 @ResponseBody
17 @RequiredArgsConstructor
18 public class RestProducer
19 {
20   private final String id;
21   private final String topic;
22   private final Integer partition;
23   private final KafkaProducer<String, Object> producer;
24
25   private long produced = 0;
26
27   @PostMapping(path = "{key}")
28   public DeferredResult<ProduceResult> send(
29       @PathVariable String key,
30       @RequestHeader(name = "X-id", required = false) Long correlationId,
31       @RequestBody Integer number,
32       @RequestParam(required = false) boolean error)
33   {
34     ResultRecorder result = new ResultRecorder(number+1);
35
36     for (int i = 1; i <= number; i++)
37     {
38       int next = error && i == (number+1)/2 ? i * -1 : i;
39       send(key, new AddNumberMessage(number, next), correlationId, result);
40     }
41     send(key, new CalculateSumMessage(number), correlationId, result);
42
43     return result.getDeferredResult();
44   }
45
46   private void send(
47       String key,
48       Object value,
49       Long correlationId,
50       ResultRecorder result)
51   {
52     final long time = System.currentTimeMillis();
53
54     final ProducerRecord<String, Object> record = new ProducerRecord<>(
55         topic,  // Topic
56         partition, // Partition
57         key,    // Key
58         value   // Value
59     );
60
61     record.headers().add("source", id.getBytes());
62     if (correlationId != null)
63     {
64       record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
65     }
66
67     producer.send(record, (metadata, e) ->
68     {
69       long now = System.currentTimeMillis();
70       if (e == null)
71       {
72         // HANDLE SUCCESS
73         result.addSuccess(metadata);
74         produced++;
75         log.debug(
76             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
77             id,
78             record.key(),
79             record.value(),
80             metadata.partition(),
81             metadata.offset(),
82             metadata.timestamp(),
83             now - time
84         );
85       }
86       else
87       {
88         // HANDLE ERROR
89         result.addFailure(e);
90         log.error(
91             "{} - ERROR key={} timestamp={} latency={}ms: {}",
92             id,
93             record.key(),
94             metadata == null ? -1 : metadata.timestamp(),
95             now - time,
96             e.toString()
97         );
98       }
99     });
100
101     long now = System.currentTimeMillis();
102     log.trace(
103         "{} - Queued message with key={} latency={}ms",
104         id,
105         record.key(),
106         now - time
107     );
108   }
109
110   @ExceptionHandler
111   @ResponseStatus(HttpStatus.BAD_REQUEST)
112   public ErrorResponse illegalStateException(IllegalStateException e)
113   {
114     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
115   }
116 }