recorder:1.0.0 - send recordings keyed by username
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / recorder / RecorderController.java
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java
new file mode 100644 (file)
index 0000000..5fe69ad
--- /dev/null
@@ -0,0 +1,80 @@
+package de.juplo.kafka.wordcount.recorder;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.validation.constraints.NotEmpty;
+
+
+@RestController
+public class RecorderController
+{
+  private final String topic;
+  private final KafkaProducer<String, String> producer;
+
+
+  public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
+  {
+    this.topic = properties.getTopic();
+    this.producer = producer;
+  }
+
+  @PostMapping(
+      path = "/{username}",
+      consumes = {
+          MimeTypeUtils.TEXT_PLAIN_VALUE,
+          MimeTypeUtils.APPLICATION_JSON_VALUE
+      },
+      produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
+  DeferredResult<ResponseEntity<RecordingResult>> speak(
+      @PathVariable
+      @NotEmpty(message = "A username must be provided")
+      String username,
+      @RequestBody
+      @NotEmpty(message = "The spoken sentence must not be empty!")
+      String sentence)
+  {
+    DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
+
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
+    producer.send(record, (metadata, exception) ->
+    {
+      if (metadata != null)
+      {
+        result.setResult(
+            ResponseEntity.ok(RecordingResult.of(
+                username,
+                sentence,
+                topic,
+                metadata.partition(),
+                metadata.offset(),
+                null,
+                null)));
+      }
+      else
+      {
+        result.setErrorResult(
+            ResponseEntity
+                .internalServerError()
+                .body(RecordingResult.of(
+                    username,
+                    sentence,
+                    topic,
+                    null,
+                    null,
+                    HttpStatus.INTERNAL_SERVER_ERROR.value(),
+                    exception.toString())));
+      }
+    });
+
+    return result;
+  }
+}