56f3382415e4d130e03c69d2e7a1e8ad8818c119
[demos/kafka/training] / src / main / java / de / juplo / kafka / RestProducer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.clients.producer.RecordMetadata;
6 import org.springframework.http.HttpStatus;
7 import org.springframework.kafka.core.KafkaTemplate;
8 import org.springframework.web.bind.annotation.*;
9 import org.springframework.web.context.request.async.DeferredResult;
10
11 import javax.annotation.PreDestroy;
12 import java.util.concurrent.ExecutionException;
13
14
15 @Slf4j
16 @RestController
17 public class RestProducer
18 {
19   private final String id;
20   private final String topic;
21   private final KafkaTemplate<String, Object> kafkaTemplate;
22
23   private long produced = 0;
24
25   public RestProducer(
26       ApplicationProperties properties,
27       KafkaTemplate<String, Object> kafkaTemplate)
28   {
29     this.id = properties.getClientId();
30     this.topic = properties.getTopic();
31     this.kafkaTemplate = kafkaTemplate;
32   }
33
34   @PostMapping(path = "{key}")
35   public DeferredResult<ProduceResult> message(
36       @PathVariable String key,
37       @RequestBody String value)
38   {
39     key = key.trim();
40     final ProducerRecord<String, Object> record = new ProducerRecord<>(
41         topic,  // Topic
42         key,    // Key
43         new ClientMessage(key, value) // Value
44     );
45
46     return send(record);
47   }
48
49   @PutMapping(path = "{key}")
50   public DeferredResult<ProduceResult> message(@PathVariable String key)
51   {
52     key = key.trim();
53     final ProducerRecord<String, Object> record = new ProducerRecord<>(
54         topic,  // Topic
55         key,    // Key
56         new FooMessage(key, System.currentTimeMillis()) // Value
57     );
58
59     return send(record);
60   }
61
62   @PostMapping(path = "/")
63   public DeferredResult<ProduceResult> greeting(
64       @RequestBody String name)
65   {
66     name = name.trim();
67     final ProducerRecord<String, Object> record = new ProducerRecord<>(
68         topic,  // Topic
69         name,    // Key
70         new Greeting(name) // Value
71     );
72
73     return send(record);
74   }
75
76   private DeferredResult<ProduceResult> send(ProducerRecord<String, Object> record)
77   {
78     DeferredResult<ProduceResult> result = new DeferredResult<>();
79
80     final long time = System.currentTimeMillis();
81
82     kafkaTemplate.send(record).addCallback(
83       (sendResult) ->
84       {
85         long now = System.currentTimeMillis();
86
87         // HANDLE SUCCESS
88         RecordMetadata metadata = sendResult.getRecordMetadata();
89         produced++;
90         result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
91         log.debug(
92             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
93             id,
94             record.key(),
95             record.value(),
96             metadata.partition(),
97             metadata.offset(),
98             metadata.timestamp(),
99             now - time
100         );
101       },
102       (e) ->
103       {
104         long now = System.currentTimeMillis();
105
106         // HANDLE ERROR
107         result.setErrorResult(new ProduceFailure(e));
108         log.error(
109             "{} - ERROR key={} timestamp=-1 latency={}ms: {}",
110             id,
111             record.key(),
112             now - time,
113             e.toString()
114         );
115       });
116
117     long now = System.currentTimeMillis();
118     log.trace(
119         "{} - Queued key={} latency={}ms",
120         id,
121         record.key(),
122         now - time
123     );
124
125     return result;
126   }
127
128   @ExceptionHandler
129   @ResponseStatus(HttpStatus.BAD_REQUEST)
130   public ErrorResponse illegalStateException(IllegalStateException e)
131   {
132     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
133   }
134
135   @PreDestroy
136   public void destroy() throws ExecutionException, InterruptedException
137   {
138     log.info("{} - Destroy!", id);
139     log.info("{}: Produced {} messages in total, exiting!", id, produced);
140   }
141 }