4549b8fb0195dd09beced3b87749c9e9f342872f
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestGateway.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.web.bind.annotation.*;
9 import org.springframework.web.context.request.async.DeferredResult;
10
11
12 @Slf4j
13 @RequestMapping
14 @ResponseBody
15 @RequiredArgsConstructor
16 public class RestGateway
17 {
18   private final String id;
19   private final String topic;
20   private final KafkaProducer<String, Integer> producer;
21
22   private long produced = 0;
23
24   @PostMapping(path = "{key}")
25   public DeferredResult<ProduceResult> send(
26       @PathVariable String key,
27       @RequestHeader(name = "X-id", required = false) Long correlationId,
28       @RequestBody Integer value)
29   {
30     DeferredResult<ProduceResult> result = new DeferredResult<>();
31
32     final long time = System.currentTimeMillis();
33
34     final ProducerRecord<String, Integer> record = new ProducerRecord<>(
35         topic,  // Topic
36         key,    // Key
37         value   // Value
38     );
39
40     producer.send(record, (metadata, e) ->
41     {
42       long now = System.currentTimeMillis();
43       if (e == null)
44       {
45         // HANDLE SUCCESS
46         produced++;
47         result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
48         log.debug(
49             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
50             id,
51             record.key(),
52             record.value(),
53             metadata.partition(),
54             metadata.offset(),
55             metadata.timestamp(),
56             now - time
57         );
58       }
59       else
60       {
61         // HANDLE ERROR
62         result.setErrorResult(new ProduceFailure(e));
63         log.error(
64             "{} - ERROR key={} timestamp={} latency={}ms: {}",
65             id,
66             record.key(),
67             metadata == null ? -1 : metadata.timestamp(),
68             now - time,
69             e.toString()
70         );
71       }
72     });
73
74     long now = System.currentTimeMillis();
75     log.trace(
76         "{} - Queued message with key={} latency={}ms",
77         id,
78         record.key(),
79         now - time
80     );
81
82     return result;
83   }
84
85   @ExceptionHandler
86   @ResponseStatus(HttpStatus.BAD_REQUEST)
87   public ErrorResponse illegalStateException(IllegalStateException e)
88   {
89     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
90   }
91 }