Simplified example
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
index 3585604..af7da70 100644 (file)
@@ -1,11 +1,12 @@
 package de.juplo.demo.kafka.deduplication;
 
 
-import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -35,16 +36,6 @@ public class Deduplicator
     properties.put("default.key.serde", Serdes.StringSerde.class);
     properties.put("default.value.serde", Serdes.StringSerde.class);
 
-    SequenceNumberExtractor<String, String> extractor =
-        new SequenceNumberExtractor<String, String>() {
-          @Override
-          public long extract(
-              String topic, int partition, long offset, Headers headers, String key, String value)
-          {
-            return Long.parseLong(value);
-          }
-        };
-
     StreamsBuilder builder = new StreamsBuilder();
 
     // Create state-store for sequence numbers
@@ -59,7 +50,14 @@ public class Deduplicator
     builder
         .<String, String>stream("input")
         .flatTransformValues(
-            new DeduplicationTransformerSupplier<String, String>(extractor),
+            new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+            {
+              @Override
+              public ValueTransformerWithKey<String, String, Iterable<String>> get()
+              {
+                return new DeduplicationTransformer();
+              }
+            },
             DeduplicationTransformer.STORE)
         .to("output");