users: 1.0.2 - simplified the path (removed /users)
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersController.java
1 package de.juplo.kafka.wordcount.users;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.streams.errors.InvalidStateStoreException;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.http.ResponseEntity;
10 import org.springframework.util.MimeTypeUtils;
11 import org.springframework.web.bind.annotation.*;
12 import org.springframework.web.context.request.async.DeferredResult;
13
14 import java.net.URI;
15 import java.util.Optional;
16
17
18 @RestController
19 public class UsersController
20 {
21   private final String topic;
22   private final ObjectMapper mapper;
23   private final KafkaProducer<String, String> producer;
24   private final UsersStreamProcessor processor;
25
26
27
28   public UsersController(
29       UsersApplicationProperties properties,
30       ObjectMapper mapper,
31       KafkaProducer<String,String> producer,
32       UsersStreamProcessor processor)
33   {
34     this.topic = properties.getTopic();
35     this.mapper = mapper;
36     this.producer = producer;
37     this.processor = processor;
38   }
39
40   @PostMapping(
41       path = "/",
42       consumes = {
43           MimeTypeUtils.TEXT_PLAIN_VALUE,
44           MimeTypeUtils.APPLICATION_JSON_VALUE
45       },
46       produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
47   DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
48   {
49     DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
50
51     String value = mapper.writeValueAsString(user);
52     ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
53     producer.send(record, (metadata, exception) ->
54     {
55       if (metadata != null)
56       {
57         result.setResult(
58             ResponseEntity.ok(UsersResult.of(
59                 user.getUsername(),
60                 user,
61                 topic,
62                 metadata.partition(),
63                 metadata.offset(),
64                 null,
65                 null)));
66       }
67       else
68       {
69         result.setErrorResult(
70             ResponseEntity
71                 .internalServerError()
72                 .body(UsersResult.of(
73                     user.getUsername(),
74                     user,
75                     topic,
76                     null,
77                     null,
78                     HttpStatus.INTERNAL_SERVER_ERROR.value(),
79                     exception.toString())));
80       }
81     });
82
83     return result;
84   }
85
86   @GetMapping("/{username}")
87   ResponseEntity<User> queryFor(@PathVariable String username)
88   {
89     Optional<URI> redirect = processor.getRedirect(username);
90     if (redirect.isPresent())
91     {
92       return
93           ResponseEntity
94               .status(HttpStatus.TEMPORARY_REDIRECT)
95               .location(redirect.get())
96               .build();
97     }
98
99     try
100     {
101       return ResponseEntity.of(processor.getUser(username));
102     }
103     catch (InvalidStateStoreException e)
104     {
105       return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
106     }
107   }
108 }