users:1.0.0 - users are directly send to kafka
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersController.java
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java
new file mode 100644 (file)
index 0000000..4e64524
--- /dev/null
@@ -0,0 +1,79 @@
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+
+@RestController
+public class UsersController
+{
+  private final String topic;
+  private final ObjectMapper mapper;
+  private final KafkaProducer<String, String> producer;
+
+
+  public UsersController(
+      UsersApplicationProperties properties,
+      ObjectMapper mapper,
+      KafkaProducer<String,String> producer)
+  {
+    this.topic = properties.getTopic();
+    this.mapper = mapper;
+    this.producer = producer;
+  }
+
+  @PostMapping(
+      path = "/users",
+      consumes = {
+          MimeTypeUtils.TEXT_PLAIN_VALUE,
+          MimeTypeUtils.APPLICATION_JSON_VALUE
+      },
+      produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
+  DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+  {
+    DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
+
+    String value = mapper.writeValueAsString(user);
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+    producer.send(record, (metadata, exception) ->
+    {
+      if (metadata != null)
+      {
+        result.setResult(
+            ResponseEntity.ok(UsersResult.of(
+                user.getUsername(),
+                user,
+                topic,
+                metadata.partition(),
+                metadata.offset(),
+                null,
+                null)));
+      }
+      else
+      {
+        result.setErrorResult(
+            ResponseEntity
+                .internalServerError()
+                .body(UsersResult.of(
+                    user.getUsername(),
+                    user,
+                    topic,
+                    null,
+                    null,
+                    HttpStatus.INTERNAL_SERVER_ERROR.value(),
+                    exception.toString())));
+      }
+    });
+
+    return result;
+  }
+}