</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>query</artifactId>
- <version>1.0.1</version>
+ <version>1.0.2</version>
<name>Wordcount-Query</name>
<description>Query stream-processor of the multi-user wordcount-example</description>
<properties>
package de.juplo.kafka.wordcount.query;
import lombok.RequiredArgsConstructor;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
.build();
}
- return ResponseEntity.of(processor.getUserRanking(username));
+ try
+ {
+ return ResponseEntity.of(processor.getUserRanking(username));
+ }
+ catch (InvalidStateStoreException e)
+ {
+ return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+ }
}
}
HostInfo activeHost = metadata.activeHost();
log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
- if (activeHost.equals(this.hostInfo))
+ if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
{
return Optional.empty();
}