1 package de.juplo.kafka.wordcount.query;
3 import lombok.RequiredArgsConstructor;
4 import org.apache.kafka.streams.errors.InvalidStateStoreException;
5 import org.springframework.http.HttpStatus;
6 import org.springframework.http.ResponseEntity;
7 import org.springframework.web.bind.annotation.GetMapping;
8 import org.springframework.web.bind.annotation.PathVariable;
9 import org.springframework.web.bind.annotation.RestController;
12 import java.util.Optional;
16 @RequiredArgsConstructor
17 public class QueryController
19 private final QueryStreamProcessor processor;
21 @GetMapping("{username}")
22 ResponseEntity<UserRanking> queryFor(@PathVariable String username)
24 Optional<URI> redirect = processor.getRedirect(username);
25 if (redirect.isPresent())
29 .status(HttpStatus.TEMPORARY_REDIRECT)
30 .location(redirect.get())
36 return ResponseEntity.of(processor.getUserRanking(username));
38 catch (InvalidStateStoreException e)
40 return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();