X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicationTransformer.java;h=dc888bcc81026b2c95a9b979f14facf5ee4ada77;hp=3bc92d14c3ef6c6c2b035f28609e76186c1f62ab;hb=150907d53cc99a98f4c888eacff892380ffd0feb;hpb=8304884adae524b5ada843b4db66473c94da19ca diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java index 3bc92d1..dc888bc 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java @@ -2,7 +2,7 @@ package de.juplo.demo.kafka.deduplication; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; @@ -11,7 +11,7 @@ import java.util.Collections; @Slf4j -public class DeduplicationTransformer implements ValueTransformerWithKey> +public class DeduplicationTransformer implements ValueTransformer> { public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE"; private ProcessorContext context; @@ -26,7 +26,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey transform(String key, String value) + public Iterable transform(String value) { String topic = context.topic(); Integer partition = context.partition(); @@ -42,11 +42,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey