+ @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
+ public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
+ {
+ KeyQueryMetadata metadata =
+ streams.queryMetadataForKey(
+ DeduplicationTransformer.STORE,
+ partition, new IntegerSerializer());
+
+ if (metadata.getActiveHost().port() != port ||
+ !metadata.getActiveHost().host().equals(host))
+ {
+ return
+ ResponseEntity
+ .status(HttpStatus.TEMPORARY_REDIRECT)
+ .header(
+ HttpHeaders.LOCATION,
+ "http://" +
+ metadata.getActiveHost().host() +
+ ":" +
+ metadata.getActiveHost().port() +
+ "/watermark/" +
+ partition)
+ .build();
+ }
+
+ ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
+ StoreQueryParameters.fromNameAndType(
+ DeduplicationTransformer.STORE,
+ QueryableStoreTypes.keyValueStore()));
+
+ Long watermark = store.get(partition);
+ if (watermark == null)
+ return ResponseEntity.notFound().build();
+
+ return ResponseEntity.ok().body(watermark.toString());
+ }
+
+