--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+
+@RestController
+public class UsersController
+{
+ private final String topic;
+ private final ObjectMapper mapper;
+ private final KafkaProducer<String, String> producer;
+
+
+ public UsersController(
+ UsersApplicationProperties properties,
+ ObjectMapper mapper,
+ KafkaProducer<String,String> producer)
+ {
+ this.topic = properties.getTopic();
+ this.mapper = mapper;
+ this.producer = producer;
+ }
+
+ @PostMapping(
+ path = "/users",
+ consumes = {
+ MimeTypeUtils.TEXT_PLAIN_VALUE,
+ MimeTypeUtils.APPLICATION_JSON_VALUE
+ },
+ produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
+ DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+ {
+ DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
+
+ String value = mapper.writeValueAsString(user);
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+ producer.send(record, (metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ result.setResult(
+ ResponseEntity.ok(UsersResult.of(
+ user.getUsername(),
+ user,
+ topic,
+ metadata.partition(),
+ metadata.offset(),
+ null,
+ null)));
+ }
+ else
+ {
+ result.setErrorResult(
+ ResponseEntity
+ .internalServerError()
+ .body(UsersResult.of(
+ user.getUsername(),
+ user,
+ topic,
+ null,
+ null,
+ HttpStatus.INTERNAL_SERVER_ERROR.value(),
+ exception.toString())));
+ }
+ });
+
+ return result;
+ }
+}