X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;h=5c0f554a228cc5aac7d5e8e09a312dbf9b1c2c7b;hp=3585604d02d4032b1563edf8e06e59ae5e12cd52;hb=85905a556b97077ef99332928eb1401864af24e5;hpb=5bb3d2908ccfe091084aa8a0a8b1282fb19fb3e1 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 3585604..5c0f554 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,17 +1,22 @@ package de.juplo.demo.kafka.deduplication; -import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -19,15 +24,16 @@ import java.time.Duration; import java.util.Properties; -@Component +@RestController public class Deduplicator { final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class); public final KafkaStreams streams; + public final String host; + public final int port; - - public Deduplicator() + public Deduplicator(ServerProperties serverProperties) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -35,16 +41,27 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); - SequenceNumberExtractor extractor = - new SequenceNumberExtractor() { - @Override - public long extract( - String topic, int partition, long offset, Headers headers, String key, String value) - { - return Long.parseLong(value); - } - }; + this.host = serverProperties.getAddress().getHostAddress(); + this.port = serverProperties.getPort(); + properties.put("application.server", host + ":" + port); + + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(Duration.ofSeconds(5)); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + static Topology buildTopology() + { StreamsBuilder builder = new StreamsBuilder(); // Create state-store for sequence numbers @@ -59,24 +76,55 @@ public class Deduplicator builder .stream("input") .flatTransformValues( - new DeduplicationTransformerSupplier(extractor), + new ValueTransformerWithKeySupplier>() + { + @Override + public ValueTransformerWithKey> get() + { + return new DeduplicationTransformer(); + } + }, DeduplicationTransformer.STORE) .to("output"); - Topology topology = builder.build(); - streams = new KafkaStreams(topology, properties); - streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + return builder.build(); + } + + @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE) + public ResponseEntity getWatermarks(@PathVariable Integer partition) + { + KeyQueryMetadata metadata = + streams.queryMetadataForKey( + DeduplicationTransformer.STORE, + partition, new IntegerSerializer()); + + if (metadata.getActiveHost().port() != port || + !metadata.getActiveHost().host().equals(host)) { - LOG.error("Unexpected error in thread {}: {}", t, e.toString()); - try - { - streams.close(Duration.ofSeconds(5)); - } - catch (Exception ex) - { - LOG.error("Could not close KafkaStreams!", ex); - } - }); + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .header( + HttpHeaders.LOCATION, + "http://" + + metadata.getActiveHost().host() + + ":" + + metadata.getActiveHost().port() + + "/watermark/" + + partition) + .build(); + } + + ReadOnlyKeyValueStore store = streams.store( + StoreQueryParameters.fromNameAndType( + DeduplicationTransformer.STORE, + QueryableStoreTypes.keyValueStore())); + + Long watermark = store.get(partition); + if (watermark == null) + return ResponseEntity.notFound().build(); + + return ResponseEntity.ok().body(watermark.toString()); }