query:1.0.2 - Fixed responses for availability edge-cases
authorKai Moritz <kai@juplo.de>
Sat, 4 Sep 2021 15:35:53 +0000 (17:35 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Oct 2021 18:47:29 +0000 (20:47 +0200)
* No invalid redirect, if the host-info ist not available
* 503 Service Temporarily Unavailable, while streams is not ready

pom.xml
src/main/java/de/juplo/kafka/wordcount/query/QueryController.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java

diff --git a/pom.xml b/pom.xml
index 2dd080b..56a2b76 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>query</artifactId>
-       <version>1.0.1</version>
+       <version>1.0.2</version>
        <name>Wordcount-Query</name>
        <description>Query stream-processor of the multi-user wordcount-example</description>
        <properties>
index 0c7dc31..a9b5b80 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.wordcount.query;
 
 import lombok.RequiredArgsConstructor;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -30,6 +31,13 @@ public class QueryController
               .build();
     }
 
-    return ResponseEntity.of(processor.getUserRanking(username));
+    try
+    {
+      return ResponseEntity.of(processor.getUserRanking(username));
+    }
+    catch (InvalidStateStoreException e)
+    {
+      return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+    }
   }
 }
index f7dc750..696e088 100644 (file)
@@ -99,7 +99,7 @@ public class QueryStreamProcessor
                HostInfo activeHost = metadata.activeHost();
                log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
 
-               if (activeHost.equals(this.hostInfo))
+               if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
                {
                        return Optional.empty();
                }