1 package de.juplo.kafka.wordcount.users;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.streams.errors.InvalidStateStoreException;
8 import org.springframework.http.HttpStatus;
9 import org.springframework.http.ResponseEntity;
10 import org.springframework.util.MimeTypeUtils;
11 import org.springframework.web.bind.annotation.*;
12 import org.springframework.web.context.request.async.DeferredResult;
15 import java.util.Optional;
19 public class UsersController
21 private final String topic;
22 private final ObjectMapper mapper;
23 private final KafkaProducer<String, String> producer;
24 private final UsersStreamProcessor processor;
28 public UsersController(
29 UsersApplicationProperties properties,
31 KafkaProducer<String,String> producer,
32 UsersStreamProcessor processor)
34 this.topic = properties.getTopic();
36 this.producer = producer;
37 this.processor = processor;
43 MimeTypeUtils.TEXT_PLAIN_VALUE,
44 MimeTypeUtils.APPLICATION_JSON_VALUE
46 produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
47 DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
49 DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
51 String value = mapper.writeValueAsString(user);
52 ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
53 producer.send(record, (metadata, exception) ->
58 ResponseEntity.ok(UsersResult.of(
69 result.setErrorResult(
71 .internalServerError()
78 HttpStatus.INTERNAL_SERVER_ERROR.value(),
79 exception.toString())));
86 @GetMapping("/users/{username}")
87 ResponseEntity<User> queryFor(@PathVariable String username)
89 Optional<URI> redirect = processor.getRedirect(username);
90 if (redirect.isPresent())
94 .status(HttpStatus.TEMPORARY_REDIRECT)
95 .location(redirect.get())
101 return ResponseEntity.of(processor.getUser(username));
103 catch (InvalidStateStoreException e)
105 return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();