X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicationTransformer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicationTransformer.java;h=3bc92d14c3ef6c6c2b035f28609e76186c1f62ab;hp=e07fb372f6586f6570d51327557aec32a9bbf704;hb=1fc026384b7699a18323b3ca8106bea86f173349;hpb=5bb3d2908ccfe091084aa8a0a8b1282fb19fb3e1 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java index e07fb37..3bc92d1 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java @@ -11,21 +11,13 @@ import java.util.Collections; @Slf4j -public class DeduplicationTransformer implements ValueTransformerWithKey> +public class DeduplicationTransformer implements ValueTransformerWithKey> { - final SequenceNumberExtractor extractor; - public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE"; private ProcessorContext context; private KeyValueStore store; - public DeduplicationTransformer(SequenceNumberExtractor extractor) - { - this.extractor = extractor; - } - - @Override public void init(ProcessorContext context) { @@ -34,14 +26,14 @@ public class DeduplicationTransformer implements ValueTransformerWithKey transform(K key, V value) + public Iterable transform(String key, String value) { String topic = context.topic(); Integer partition = context.partition(); long offset = context.offset(); Headers headers = context.headers(); - long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value); + long sequenceNumber = Long.parseLong(value); Long seen = store.get(partition); if (seen == null || seen < sequenceNumber) @@ -51,12 +43,10 @@ public class DeduplicationTransformer implements ValueTransformerWithKey