1 package de.juplo.kafka.wordcount.recorder;
3 import org.apache.kafka.clients.producer.KafkaProducer;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.springframework.http.HttpStatus;
6 import org.springframework.http.ResponseEntity;
7 import org.springframework.util.MimeTypeUtils;
8 import org.springframework.web.bind.annotation.PathVariable;
9 import org.springframework.web.bind.annotation.PostMapping;
10 import org.springframework.web.bind.annotation.RequestBody;
11 import org.springframework.web.bind.annotation.RestController;
12 import org.springframework.web.context.request.async.DeferredResult;
14 import jakarta.validation.constraints.NotEmpty;
18 public class RecorderController
20 private final String topic;
21 private final KafkaProducer<User, Recording> producer;
24 public RecorderController(
25 RecorderApplicationProperties properties,
26 KafkaProducer<User, Recording> producer)
28 this.topic = properties.getTopic();
29 this.producer = producer;
35 MimeTypeUtils.TEXT_PLAIN_VALUE,
36 MimeTypeUtils.APPLICATION_JSON_VALUE
38 produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
39 DeferredResult<ResponseEntity<RecordingResult>> speak(
41 @NotEmpty(message = "A username must be provided")
44 @NotEmpty(message = "The spoken sentence must not be empty!")
47 DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
49 ProducerRecord<User, Recording> record = new ProducerRecord<>(
52 Recording.of(username, sentence));
54 producer.send(record, (metadata, exception) ->
59 ResponseEntity.ok(RecordingResult.of(
70 result.setErrorResult(
72 .internalServerError()
73 .body(RecordingResult.of(
79 HttpStatus.INTERNAL_SERVER_ERROR.value(),
80 exception.toString())));