X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;h=af7da707809e481d06f56ad7f85ca23c4386170e;hp=3585604d02d4032b1563edf8e06e59ae5e12cd52;hb=1fc026384b7699a18323b3ca8106bea86f173349;hpb=5bb3d2908ccfe091084aa8a0a8b1282fb19fb3e1 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 3585604..af7da70 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,11 +1,12 @@ package de.juplo.demo.kafka.deduplication; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -35,16 +36,6 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); - SequenceNumberExtractor extractor = - new SequenceNumberExtractor() { - @Override - public long extract( - String topic, int partition, long offset, Headers headers, String key, String value) - { - return Long.parseLong(value); - } - }; - StreamsBuilder builder = new StreamsBuilder(); // Create state-store for sequence numbers @@ -59,7 +50,14 @@ public class Deduplicator builder .stream("input") .flatTransformValues( - new DeduplicationTransformerSupplier(extractor), + new ValueTransformerWithKeySupplier>() + { + @Override + public ValueTransformerWithKey> get() + { + return new DeduplicationTransformer(); + } + }, DeduplicationTransformer.STORE) .to("output");