1 package de.juplo.kafka.wordcount.users;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.http.ResponseEntity;
9 import org.springframework.util.MimeTypeUtils;
10 import org.springframework.web.bind.annotation.PostMapping;
11 import org.springframework.web.bind.annotation.RequestBody;
12 import org.springframework.web.bind.annotation.RestController;
13 import org.springframework.web.context.request.async.DeferredResult;
17 public class UsersController
19 private final String topic;
20 private final ObjectMapper mapper;
21 private final KafkaProducer<String, String> producer;
24 public UsersController(
25 UsersApplicationProperties properties,
27 KafkaProducer<String,String> producer)
29 this.topic = properties.getTopic();
31 this.producer = producer;
37 MimeTypeUtils.TEXT_PLAIN_VALUE,
38 MimeTypeUtils.APPLICATION_JSON_VALUE
40 produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
41 DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
43 DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
45 String value = mapper.writeValueAsString(user);
46 ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
47 producer.send(record, (metadata, exception) ->
52 ResponseEntity.ok(UsersResult.of(
63 result.setErrorResult(
65 .internalServerError()
72 HttpStatus.INTERNAL_SERVER_ERROR.value(),
73 exception.toString())));