recorder:1.0.0 - send recordings keyed by username
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / recorder / RecorderController.java
1 package de.juplo.kafka.wordcount.recorder;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.springframework.http.HttpStatus;
6 import org.springframework.http.ResponseEntity;
7 import org.springframework.util.MimeTypeUtils;
8 import org.springframework.web.bind.annotation.PathVariable;
9 import org.springframework.web.bind.annotation.PostMapping;
10 import org.springframework.web.bind.annotation.RequestBody;
11 import org.springframework.web.bind.annotation.RestController;
12 import org.springframework.web.context.request.async.DeferredResult;
13
14 import javax.validation.constraints.NotEmpty;
15
16
17 @RestController
18 public class RecorderController
19 {
20   private final String topic;
21   private final KafkaProducer<String, String> producer;
22
23
24   public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
25   {
26     this.topic = properties.getTopic();
27     this.producer = producer;
28   }
29
30   @PostMapping(
31       path = "/{username}",
32       consumes = {
33           MimeTypeUtils.TEXT_PLAIN_VALUE,
34           MimeTypeUtils.APPLICATION_JSON_VALUE
35       },
36       produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
37   DeferredResult<ResponseEntity<RecordingResult>> speak(
38       @PathVariable
39       @NotEmpty(message = "A username must be provided")
40       String username,
41       @RequestBody
42       @NotEmpty(message = "The spoken sentence must not be empty!")
43       String sentence)
44   {
45     DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
46
47     ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
48     producer.send(record, (metadata, exception) ->
49     {
50       if (metadata != null)
51       {
52         result.setResult(
53             ResponseEntity.ok(RecordingResult.of(
54                 username,
55                 sentence,
56                 topic,
57                 metadata.partition(),
58                 metadata.offset(),
59                 null,
60                 null)));
61       }
62       else
63       {
64         result.setErrorResult(
65             ResponseEntity
66                 .internalServerError()
67                 .body(RecordingResult.of(
68                     username,
69                     sentence,
70                     topic,
71                     null,
72                     null,
73                     HttpStatus.INTERNAL_SERVER_ERROR.value(),
74                     exception.toString())));
75       }
76     });
77
78     return result;
79   }
80 }