import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
+import java.net.URI;
+import java.util.Optional;
+
@RestController
public class UsersController
private final String topic;
private final ObjectMapper mapper;
private final KafkaProducer<String, String> producer;
+ private final UsersStreamProcessor processor;
+
public UsersController(
UsersApplicationProperties properties,
ObjectMapper mapper,
- KafkaProducer<String,String> producer)
+ KafkaProducer<String,String> producer,
+ UsersStreamProcessor processor)
{
this.topic = properties.getTopic();
this.mapper = mapper;
this.producer = producer;
+ this.processor = processor;
}
@PostMapping(
return result;
}
+
+ @GetMapping("/users/{username}")
+ ResponseEntity<User> queryFor(@PathVariable String username)
+ {
+ Optional<URI> redirect = processor.getRedirect(username);
+ if (redirect.isPresent())
+ {
+ return
+ ResponseEntity
+ .status(HttpStatus.TEMPORARY_REDIRECT)
+ .location(redirect.get())
+ .build();
+ }
+
+ try
+ {
+ return ResponseEntity.of(processor.getUser(username));
+ }
+ catch (InvalidStateStoreException e)
+ {
+ return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+ }
+ }
}