WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersController.java
1 package de.juplo.kafka.wordcount.users;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.wordcount.avro.User;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.http.ResponseEntity;
10 import org.springframework.util.MimeTypeUtils;
11 import org.springframework.web.bind.annotation.PostMapping;
12 import org.springframework.web.bind.annotation.RequestBody;
13 import org.springframework.web.bind.annotation.RestController;
14 import org.springframework.web.context.request.async.DeferredResult;
15
16
17 @RestController
18 public class UsersController
19 {
20   private final String topic;
21   private final ObjectMapper mapper;
22   private final KafkaProducer<String, User> producer;
23
24
25   public UsersController(
26       UsersApplicationProperties properties,
27       ObjectMapper mapper,
28       KafkaProducer<String,User> producer)
29   {
30     this.topic = properties.getTopic();
31     this.mapper = mapper;
32     this.producer = producer;
33   }
34
35   @PostMapping(
36       path = "/users",
37       consumes = {
38           MimeTypeUtils.TEXT_PLAIN_VALUE,
39           MimeTypeUtils.APPLICATION_JSON_VALUE
40       },
41       produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
42   DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody UserRequest userRequest) throws JsonProcessingException
43   {
44     DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
45
46     ProducerRecord<String, User> record =
47         new ProducerRecord<>(
48             topic,
49             userRequest.getUsername(),
50             User
51                 .newBuilder()
52                 .setUsername(userRequest.getUsername())
53                 .setFirstName(userRequest.getFirstName())
54                 .setLastName(userRequest.getLastName())
55                 .build());
56     producer.send(record, (metadata, exception) ->
57     {
58       if (metadata != null)
59       {
60         result.setResult(
61             ResponseEntity.ok(UsersResult.of(
62                 userRequest.getUsername(),
63                 userRequest,
64                 topic,
65                 metadata.partition(),
66                 metadata.offset(),
67                 null,
68                 null)));
69       }
70       else
71       {
72         result.setErrorResult(
73             ResponseEntity
74                 .internalServerError()
75                 .body(UsersResult.of(
76                     userRequest.getUsername(),
77                     userRequest,
78                     topic,
79                     null,
80                     null,
81                     HttpStatus.INTERNAL_SERVER_ERROR.value(),
82                     exception.toString())));
83       }
84     });
85
86     return result;
87   }
88 }