1 package de.juplo.kafka.wordcount.users;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.wordcount.avro.User;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.http.ResponseEntity;
10 import org.springframework.util.MimeTypeUtils;
11 import org.springframework.web.bind.annotation.PostMapping;
12 import org.springframework.web.bind.annotation.RequestBody;
13 import org.springframework.web.bind.annotation.RestController;
14 import org.springframework.web.context.request.async.DeferredResult;
18 public class UsersController
20 private final String topic;
21 private final ObjectMapper mapper;
22 private final KafkaProducer<String, User> producer;
25 public UsersController(
26 UsersApplicationProperties properties,
28 KafkaProducer<String,User> producer)
30 this.topic = properties.getTopic();
32 this.producer = producer;
38 MimeTypeUtils.TEXT_PLAIN_VALUE,
39 MimeTypeUtils.APPLICATION_JSON_VALUE
41 produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
42 DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody UserRequest userRequest) throws JsonProcessingException
44 DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
46 ProducerRecord<String, User> record =
49 userRequest.getUsername(),
52 .setUsername(userRequest.getUsername())
53 .setFirstName(userRequest.getFirstName())
54 .setLastName(userRequest.getLastName())
56 producer.send(record, (metadata, exception) ->
61 ResponseEntity.ok(UsersResult.of(
62 userRequest.getUsername(),
72 result.setErrorResult(
74 .internalServerError()
76 userRequest.getUsername(),
81 HttpStatus.INTERNAL_SERVER_ERROR.value(),
82 exception.toString())));