+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@RestController
-public class RecorderController
-{
- private final String topic;
- private final KafkaProducer<String, String> producer;
-
-
- public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
- {
- this.topic = properties.getTopic();
- this.producer = producer;
- }
-
- @PostMapping(
- path = "/{username}",
- consumes = {
- MimeTypeUtils.TEXT_PLAIN_VALUE,
- MimeTypeUtils.APPLICATION_JSON_VALUE
- },
- produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
- DeferredResult<ResponseEntity<RecordingResult>> speak(
- @PathVariable
- @NotEmpty(message = "A username must be provided")
- String username,
- @RequestBody
- @NotEmpty(message = "The spoken sentence must not be empty!")
- String sentence)
- {
- DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
-
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
- producer.send(record, (metadata, exception) ->
- {
- if (metadata != null)
- {
- result.setResult(
- ResponseEntity.ok(RecordingResult.of(
- username,
- sentence,
- topic,
- metadata.partition(),
- metadata.offset(),
- null,
- null)));
- }
- else
- {
- result.setErrorResult(
- ResponseEntity
- .internalServerError()
- .body(RecordingResult.of(
- username,
- sentence,
- topic,
- null,
- null,
- HttpStatus.INTERNAL_SERVER_ERROR.value(),
- exception.toString())));
- }
- });
-
- return result;
- }
-}