recorder: 1.1.0 - recordings are sent as JSON
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / recorder / RecorderController.java
1 package de.juplo.kafka.wordcount.recorder;
2
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.springframework.http.HttpStatus;
6 import org.springframework.http.ResponseEntity;
7 import org.springframework.util.MimeTypeUtils;
8 import org.springframework.web.bind.annotation.PathVariable;
9 import org.springframework.web.bind.annotation.PostMapping;
10 import org.springframework.web.bind.annotation.RequestBody;
11 import org.springframework.web.bind.annotation.RestController;
12 import org.springframework.web.context.request.async.DeferredResult;
13
14 import jakarta.validation.constraints.NotEmpty;
15
16
17 @RestController
18 public class RecorderController
19 {
20   private final String topic;
21   private final KafkaProducer<String, Recording> producer;
22
23
24   public RecorderController(
25       RecorderApplicationProperties properties,
26       KafkaProducer<String,Recording> producer)
27   {
28     this.topic = properties.getTopic();
29     this.producer = producer;
30   }
31
32   @PostMapping(
33       path = "/{username}",
34       consumes = {
35           MimeTypeUtils.TEXT_PLAIN_VALUE,
36           MimeTypeUtils.APPLICATION_JSON_VALUE
37       },
38       produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
39   DeferredResult<ResponseEntity<RecordingResult>> speak(
40       @PathVariable
41       @NotEmpty(message = "A username must be provided")
42       String username,
43       @RequestBody
44       @NotEmpty(message = "The spoken sentence must not be empty!")
45       String sentence)
46   {
47     DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
48
49     ProducerRecord<String, Recording> record = new ProducerRecord<>(
50         topic,
51         username,
52         Recording.of(username, sentence));
53
54     producer.send(record, (metadata, exception) ->
55     {
56       if (metadata != null)
57       {
58         result.setResult(
59             ResponseEntity.ok(RecordingResult.of(
60                 username,
61                 sentence,
62                 topic,
63                 metadata.partition(),
64                 metadata.offset(),
65                 null,
66                 null)));
67       }
68       else
69       {
70         result.setErrorResult(
71             ResponseEntity
72                 .internalServerError()
73                 .body(RecordingResult.of(
74                     username,
75                     sentence,
76                     topic,
77                     null,
78                     null,
79                     HttpStatus.INTERNAL_SERVER_ERROR.value(),
80                     exception.toString())));
81       }
82     });
83
84     return result;
85   }
86 }