From: Kai Moritz Date: Fri, 9 Oct 2020 20:24:56 +0000 (+0200) Subject: Added an endpoint to query the watermarks by partition X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=commitdiff_plain;h=85905a556b97077ef99332928eb1401864af24e5;ds=sidebyside Added an endpoint to query the watermarks by partition --- diff --git a/docker-compose.yml b/docker-compose.yml index 17aff95..2b51357 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,10 @@ services: deduplicator: image: juplo/deduplicator:streams + ports: + - 8080:8080 + environment: + server.address: deduplicator depends_on: - zookeeper - kafka diff --git a/pom.xml b/pom.xml index 210b8aa..126fb8a 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ org.springframework.boot - spring-boot-starter + spring-boot-starter-web org.apache.kafka diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 8f173f7..5c0f554 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,18 +1,22 @@ package de.juplo.demo.kafka.deduplication; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -20,15 +24,16 @@ import java.time.Duration; import java.util.Properties; -@Component +@RestController public class Deduplicator { final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class); public final KafkaStreams streams; + public final String host; + public final int port; - - public Deduplicator() + public Deduplicator(ServerProperties serverProperties) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -36,6 +41,10 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); + this.host = serverProperties.getAddress().getHostAddress(); + this.port = serverProperties.getPort(); + properties.put("application.server", host + ":" + port); + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> { @@ -81,6 +90,44 @@ public class Deduplicator return builder.build(); } + @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE) + public ResponseEntity getWatermarks(@PathVariable Integer partition) + { + KeyQueryMetadata metadata = + streams.queryMetadataForKey( + DeduplicationTransformer.STORE, + partition, new IntegerSerializer()); + + if (metadata.getActiveHost().port() != port || + !metadata.getActiveHost().host().equals(host)) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .header( + HttpHeaders.LOCATION, + "http://" + + metadata.getActiveHost().host() + + ":" + + metadata.getActiveHost().port() + + "/watermark/" + + partition) + .build(); + } + + ReadOnlyKeyValueStore store = streams.store( + StoreQueryParameters.fromNameAndType( + DeduplicationTransformer.STORE, + QueryableStoreTypes.keyValueStore())); + + Long watermark = store.get(partition); + if (watermark == null) + return ResponseEntity.notFound().build(); + + return ResponseEntity.ok().body(watermark.toString()); + } + + @PostConstruct public void start() { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..7ef7e54 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,3 @@ +server: + address: 127.0.0.1 + port: 8080