X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;h=5c0f554a228cc5aac7d5e8e09a312dbf9b1c2c7b;hp=af7da707809e481d06f56ad7f85ca23c4386170e;hb=85905a556b97077ef99332928eb1401864af24e5;hpb=1fc026384b7699a18323b3ca8106bea86f173349 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index af7da70..5c0f554 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,18 +1,22 @@ package de.juplo.demo.kafka.deduplication; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -20,15 +24,16 @@ import java.time.Duration; import java.util.Properties; -@Component +@RestController public class Deduplicator { final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class); public final KafkaStreams streams; + public final String host; + public final int port; - - public Deduplicator() + public Deduplicator(ServerProperties serverProperties) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -36,6 +41,27 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); + this.host = serverProperties.getAddress().getHostAddress(); + this.port = serverProperties.getPort(); + properties.put("application.server", host + ":" + port); + + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(Duration.ofSeconds(5)); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + static Topology buildTopology() + { StreamsBuilder builder = new StreamsBuilder(); // Create state-store for sequence numbers @@ -61,20 +87,44 @@ public class Deduplicator DeduplicationTransformer.STORE) .to("output"); - Topology topology = builder.build(); - streams = new KafkaStreams(topology, properties); - streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + return builder.build(); + } + + @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE) + public ResponseEntity getWatermarks(@PathVariable Integer partition) + { + KeyQueryMetadata metadata = + streams.queryMetadataForKey( + DeduplicationTransformer.STORE, + partition, new IntegerSerializer()); + + if (metadata.getActiveHost().port() != port || + !metadata.getActiveHost().host().equals(host)) { - LOG.error("Unexpected error in thread {}: {}", t, e.toString()); - try - { - streams.close(Duration.ofSeconds(5)); - } - catch (Exception ex) - { - LOG.error("Could not close KafkaStreams!", ex); - } - }); + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .header( + HttpHeaders.LOCATION, + "http://" + + metadata.getActiveHost().host() + + ":" + + metadata.getActiveHost().port() + + "/watermark/" + + partition) + .build(); + } + + ReadOnlyKeyValueStore store = streams.store( + StoreQueryParameters.fromNameAndType( + DeduplicationTransformer.STORE, + QueryableStoreTypes.keyValueStore())); + + Long watermark = store.get(partition); + if (watermark == null) + return ResponseEntity.notFound().build(); + + return ResponseEntity.ok().body(watermark.toString()); }