From 1fc026384b7699a18323b3ca8106bea86f173349 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Oct 2020 14:18:09 +0200 Subject: [PATCH] Simplified example --- .../DeduplicationTransformer.java | 20 ++++------------ .../DeduplicationTransformerSupplier.java | 23 ------------------- .../kafka/deduplication/Deduplicator.java | 22 ++++++++---------- .../SequenceNumberExtractor.java | 21 ----------------- 4 files changed, 15 insertions(+), 71 deletions(-) delete mode 100644 src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java delete mode 100644 src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java index e07fb37..3bc92d1 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java @@ -11,21 +11,13 @@ import java.util.Collections; @Slf4j -public class DeduplicationTransformer implements ValueTransformerWithKey> +public class DeduplicationTransformer implements ValueTransformerWithKey> { - final SequenceNumberExtractor extractor; - public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE"; private ProcessorContext context; private KeyValueStore store; - public DeduplicationTransformer(SequenceNumberExtractor extractor) - { - this.extractor = extractor; - } - - @Override public void init(ProcessorContext context) { @@ -34,14 +26,14 @@ public class DeduplicationTransformer implements ValueTransformerWithKey transform(K key, V value) + public Iterable transform(String key, String value) { String topic = context.topic(); Integer partition = context.partition(); long offset = context.offset(); Headers headers = context.headers(); - long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value); + long sequenceNumber = Long.parseLong(value); Long seen = store.get(partition); if (seen == null || seen < sequenceNumber) @@ -51,12 +43,10 @@ public class DeduplicationTransformer implements ValueTransformerWithKey implements ValueTransformerWithKeySupplier> -{ - SequenceNumberExtractor extractor; - - - public DeduplicationTransformerSupplier(SequenceNumberExtractor extractor) - { - this.extractor = extractor; - } - - - @Override - public ValueTransformerWithKey> get() - { - return new DeduplicationTransformer(extractor); - } -} diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 3585604..af7da70 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,11 +1,12 @@ package de.juplo.demo.kafka.deduplication; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -35,16 +36,6 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); - SequenceNumberExtractor extractor = - new SequenceNumberExtractor() { - @Override - public long extract( - String topic, int partition, long offset, Headers headers, String key, String value) - { - return Long.parseLong(value); - } - }; - StreamsBuilder builder = new StreamsBuilder(); // Create state-store for sequence numbers @@ -59,7 +50,14 @@ public class Deduplicator builder .stream("input") .flatTransformValues( - new DeduplicationTransformerSupplier(extractor), + new ValueTransformerWithKeySupplier>() + { + @Override + public ValueTransformerWithKey> get() + { + return new DeduplicationTransformer(); + } + }, DeduplicationTransformer.STORE) .to("output"); diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java b/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java deleted file mode 100644 index 8f74c71..0000000 --- a/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.demo.kafka.deduplication; - -import org.apache.kafka.common.header.Headers; - - -public interface SequenceNumberExtractor -{ - /** - * Extracts a sequence number from the given value. - * - * The sequence number must be represented as a {@link Long} value. - * - * @param topic The topic, the message was issued on - * @param partition The partition, the message was written to - * @param offset The offset of the message in the partition - * @param key The key of the message - * @param value The value of the message - * @return a unique ID - */ - public long extract(String topic, int partition, long offset, Headers headers, K key, V value); -} -- 2.20.1