users: 1.0.1 - added an endpoint to receive details fo a user
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersController.java
index 4e64524..b0ab941 100644 (file)
@@ -4,14 +4,16 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
+import java.net.URI;
+import java.util.Optional;
+
 
 @RestController
 public class UsersController
@@ -19,16 +21,20 @@ public class UsersController
   private final String topic;
   private final ObjectMapper mapper;
   private final KafkaProducer<String, String> producer;
+  private final UsersStreamProcessor processor;
+
 
 
   public UsersController(
       UsersApplicationProperties properties,
       ObjectMapper mapper,
-      KafkaProducer<String,String> producer)
+      KafkaProducer<String,String> producer,
+      UsersStreamProcessor processor)
   {
     this.topic = properties.getTopic();
     this.mapper = mapper;
     this.producer = producer;
+    this.processor = processor;
   }
 
   @PostMapping(
@@ -76,4 +82,27 @@ public class UsersController
 
     return result;
   }
+
+  @GetMapping("/users/{username}")
+  ResponseEntity<User> queryFor(@PathVariable String username)
+  {
+    Optional<URI> redirect = processor.getRedirect(username);
+    if (redirect.isPresent())
+    {
+      return
+          ResponseEntity
+              .status(HttpStatus.TEMPORARY_REDIRECT)
+              .location(redirect.get())
+              .build();
+    }
+
+    try
+    {
+      return ResponseEntity.of(processor.getUser(username));
+    }
+    catch (InvalidStateStoreException e)
+    {
+      return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+    }
+  }
 }