X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Frecorder%2FRecorderController.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Frecorder%2FRecorderController.java;h=5fe69adbc6e111b44234c7fc001c35e023f582ef;hb=338279a329a06be7a141a3930d80b2a2805719dc;hp=0000000000000000000000000000000000000000;hpb=5b426263adb46e835333b127c1755c0a23035ece;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java new file mode 100644 index 0000000..5fe69ad --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java @@ -0,0 +1,80 @@ +package de.juplo.kafka.wordcount.recorder; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.util.MimeTypeUtils; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.validation.constraints.NotEmpty; + + +@RestController +public class RecorderController +{ + private final String topic; + private final KafkaProducer producer; + + + public RecorderController(RecorderApplicationProperties properties, KafkaProducer producer) + { + this.topic = properties.getTopic(); + this.producer = producer; + } + + @PostMapping( + path = "/{username}", + consumes = { + MimeTypeUtils.TEXT_PLAIN_VALUE, + MimeTypeUtils.APPLICATION_JSON_VALUE + }, + produces = MimeTypeUtils.APPLICATION_JSON_VALUE) + DeferredResult> speak( + @PathVariable + @NotEmpty(message = "A username must be provided") + String username, + @RequestBody + @NotEmpty(message = "The spoken sentence must not be empty!") + String sentence) + { + DeferredResult> result = new DeferredResult<>(); + + ProducerRecord record = new ProducerRecord<>(topic, username, sentence); + producer.send(record, (metadata, exception) -> + { + if (metadata != null) + { + result.setResult( + ResponseEntity.ok(RecordingResult.of( + username, + sentence, + topic, + metadata.partition(), + metadata.offset(), + null, + null))); + } + else + { + result.setErrorResult( + ResponseEntity + .internalServerError() + .body(RecordingResult.of( + username, + sentence, + topic, + null, + null, + HttpStatus.INTERNAL_SERVER_ERROR.value(), + exception.toString()))); + } + }); + + return result; + } +}