73bec5bd82904f7cd904fdbd6ea22155df5eb441
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.Producer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.web.bind.annotation.*;
10 import org.springframework.web.context.request.async.DeferredResult;
11
12 import java.math.BigInteger;
13
14
15 @Slf4j
16 @RequestMapping
17 @ResponseBody
18 @RequiredArgsConstructor
19 public class RestProducer
20 {
21   private final String id;
22   private final String topic;
23   private final Integer partition;
24   private final Producer<String, String> producer;
25
26   private long produced = 0;
27
28   @PostMapping(path = "{key}")
29   public DeferredResult<ProduceResult> send(
30       @PathVariable String key,
31       @RequestHeader(name = "X-id", required = false) Long correlationId,
32       @RequestBody String value)
33   {
34     DeferredResult<ProduceResult> result = new DeferredResult<>();
35
36     final long time = System.currentTimeMillis();
37
38     final ProducerRecord<String, String> record = new ProducerRecord<>(
39         topic,  // Topic
40         partition, // Partition
41         key,    // Key
42         value   // Value
43     );
44
45     producer.send(record, (metadata, e) ->
46     {
47       long now = System.currentTimeMillis();
48       if (e == null)
49       {
50         // HANDLE SUCCESS
51         produced++;
52         result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
53         log.debug(
54             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
55             id,
56             record.key(),
57             record.value(),
58             metadata.partition(),
59             metadata.offset(),
60             metadata.timestamp(),
61             now - time
62         );
63       }
64       else
65       {
66         // HANDLE ERROR
67         result.setErrorResult(new ProduceFailure(e));
68         log.error(
69             "{} - ERROR key={} timestamp={} latency={}ms: {}",
70             id,
71             record.key(),
72             metadata == null ? -1 : metadata.timestamp(),
73             now - time,
74             e.toString()
75         );
76       }
77     });
78
79     long now = System.currentTimeMillis();
80     log.trace(
81         "{} - Queued message with key={} latency={}ms",
82         id,
83         record.key(),
84         now - time
85     );
86
87     return result;
88   }
89
90   @ExceptionHandler
91   @ResponseStatus(HttpStatus.BAD_REQUEST)
92   public ErrorResponse illegalStateException(IllegalStateException e)
93   {
94     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
95   }
96 }