users:1.0.0 - users are directly send to kafka
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersController.java
1 package de.juplo.kafka.wordcount.users;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.springframework.http.HttpStatus;
8 import org.springframework.http.ResponseEntity;
9 import org.springframework.util.MimeTypeUtils;
10 import org.springframework.web.bind.annotation.PostMapping;
11 import org.springframework.web.bind.annotation.RequestBody;
12 import org.springframework.web.bind.annotation.RestController;
13 import org.springframework.web.context.request.async.DeferredResult;
14
15
16 @RestController
17 public class UsersController
18 {
19   private final String topic;
20   private final ObjectMapper mapper;
21   private final KafkaProducer<String, String> producer;
22
23
24   public UsersController(
25       UsersApplicationProperties properties,
26       ObjectMapper mapper,
27       KafkaProducer<String,String> producer)
28   {
29     this.topic = properties.getTopic();
30     this.mapper = mapper;
31     this.producer = producer;
32   }
33
34   @PostMapping(
35       path = "/users",
36       consumes = {
37           MimeTypeUtils.TEXT_PLAIN_VALUE,
38           MimeTypeUtils.APPLICATION_JSON_VALUE
39       },
40       produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
41   DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
42   {
43     DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
44
45     String value = mapper.writeValueAsString(user);
46     ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
47     producer.send(record, (metadata, exception) ->
48     {
49       if (metadata != null)
50       {
51         result.setResult(
52             ResponseEntity.ok(UsersResult.of(
53                 user.getUsername(),
54                 user,
55                 topic,
56                 metadata.partition(),
57                 metadata.offset(),
58                 null,
59                 null)));
60       }
61       else
62       {
63         result.setErrorResult(
64             ResponseEntity
65                 .internalServerError()
66                 .body(UsersResult.of(
67                     user.getUsername(),
68                     user,
69                     topic,
70                     null,
71                     null,
72                     HttpStatus.INTERNAL_SERVER_ERROR.value(),
73                     exception.toString())));
74       }
75     });
76
77     return result;
78   }
79 }