X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;h=5c0f554a228cc5aac7d5e8e09a312dbf9b1c2c7b;hp=8f173f7ad9b9be419534e77b82d534312bf45ae0;hb=85905a556b97077ef99332928eb1401864af24e5;hpb=76b0d9606d3d236a98fe660e092c1798c71fe464 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 8f173f7..5c0f554 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,18 +1,22 @@ package de.juplo.demo.kafka.deduplication; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -20,15 +24,16 @@ import java.time.Duration; import java.util.Properties; -@Component +@RestController public class Deduplicator { final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class); public final KafkaStreams streams; + public final String host; + public final int port; - - public Deduplicator() + public Deduplicator(ServerProperties serverProperties) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -36,6 +41,10 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); + this.host = serverProperties.getAddress().getHostAddress(); + this.port = serverProperties.getPort(); + properties.put("application.server", host + ":" + port); + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> { @@ -81,6 +90,44 @@ public class Deduplicator return builder.build(); } + @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE) + public ResponseEntity getWatermarks(@PathVariable Integer partition) + { + KeyQueryMetadata metadata = + streams.queryMetadataForKey( + DeduplicationTransformer.STORE, + partition, new IntegerSerializer()); + + if (metadata.getActiveHost().port() != port || + !metadata.getActiveHost().host().equals(host)) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .header( + HttpHeaders.LOCATION, + "http://" + + metadata.getActiveHost().host() + + ":" + + metadata.getActiveHost().port() + + "/watermark/" + + partition) + .build(); + } + + ReadOnlyKeyValueStore store = streams.store( + StoreQueryParameters.fromNameAndType( + DeduplicationTransformer.STORE, + QueryableStoreTypes.keyValueStore())); + + Long watermark = store.get(partition); + if (watermark == null) + return ResponseEntity.notFound().build(); + + return ResponseEntity.ok().body(watermark.toString()); + } + + @PostConstruct public void start() {