Vorlage für die JSON-Version des Rest-Producers
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.web.bind.annotation.*;
9 import org.springframework.web.context.request.async.DeferredResult;
10
11 import java.math.BigInteger;
12
13
14 @Slf4j
15 @RequestMapping
16 @ResponseBody
17 @RequiredArgsConstructor
18 public class RestProducer
19 {
20   private final String id;
21   private final String topic;
22   private final Integer partition;
23   private final KafkaProducer<String, Object> producer;
24
25   private long produced = 0;
26
27   @PostMapping(path = "{key}")
28   public DeferredResult<ProduceResult> send(
29       @PathVariable String key,
30       @RequestHeader(name = "X-id", required = false) Long correlationId,
31       @RequestBody Integer number)
32   {
33     ResultRecorder result = new ResultRecorder(1);
34
35     send(key, number, correlationId, result);
36
37     return result.getDeferredResult();
38   }
39
40   private void send(
41       String key,
42       Object value,
43       Long correlationId,
44       ResultRecorder result)
45   {
46     final long time = System.currentTimeMillis();
47
48     final ProducerRecord<String, Object> record = new ProducerRecord<>(
49         topic,  // Topic
50         partition, // Partition
51         key,    // Key
52         value   // Value
53     );
54
55     record.headers().add("source", id.getBytes());
56     if (correlationId != null)
57     {
58       record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
59     }
60
61     producer.send(record, (metadata, e) ->
62     {
63       long now = System.currentTimeMillis();
64       if (e == null)
65       {
66         // HANDLE SUCCESS
67         result.addSuccess(metadata);
68         produced++;
69         log.debug(
70             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
71             id,
72             record.key(),
73             record.value(),
74             metadata.partition(),
75             metadata.offset(),
76             metadata.timestamp(),
77             now - time
78         );
79       }
80       else
81       {
82         // HANDLE ERROR
83         result.addFailure(e);
84         log.error(
85             "{} - ERROR key={} timestamp={} latency={}ms: {}",
86             id,
87             record.key(),
88             metadata == null ? -1 : metadata.timestamp(),
89             now - time,
90             e.toString()
91         );
92       }
93     });
94
95     long now = System.currentTimeMillis();
96     log.trace(
97         "{} - Queued message with key={} latency={}ms",
98         id,
99         record.key(),
100         now - time
101     );
102   }
103
104   @ExceptionHandler
105   @ResponseStatus(HttpStatus.BAD_REQUEST)
106   public ErrorResponse illegalStateException(IllegalStateException e)
107   {
108     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
109   }
110 }