Simplified example
authorKai Moritz <kai@juplo.de>
Fri, 9 Oct 2020 12:18:09 +0000 (14:18 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Oct 2020 19:31:44 +0000 (21:31 +0200)
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java [deleted file]
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java [deleted file]

index e07fb37..3bc92d1 100644 (file)
@@ -11,21 +11,13 @@ import java.util.Collections;
 
 
 @Slf4j
-public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
+public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
 {
-  final SequenceNumberExtractor<K, V> extractor;
-
   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
   private ProcessorContext context;
   private KeyValueStore<Integer, Long> store;
 
 
-  public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
-  {
-    this.extractor = extractor;
-  }
-
-
   @Override
   public void init(ProcessorContext context)
   {
@@ -34,14 +26,14 @@ public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K
   }
 
   @Override
-  public Iterable<V> transform(K key, V value)
+  public Iterable<String> transform(String key, String value)
   {
     String topic = context.topic();
     Integer partition = context.partition();
     long offset = context.offset();
     Headers headers = context.headers();
 
-    long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
+    long sequenceNumber = Long.parseLong(value);
 
     Long seen = store.get(partition);
     if (seen == null || seen < sequenceNumber)
@@ -51,12 +43,10 @@ public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K
     }
 
     log.info(
-        "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
+        "ignoring message for key {} with sequence-number {} <= {}",
         key,
         sequenceNumber,
-        seen,
-        offset,
-        partition);
+        seen);
 
     // Signal, that the message has already been seen.
     // Downstream has to filter the null-values...
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java
deleted file mode 100644 (file)
index 37210e8..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-package de.juplo.demo.kafka.deduplication;
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-
-
-public class DeduplicationTransformerSupplier<K, V> implements ValueTransformerWithKeySupplier<K, V, Iterable<V>>
-{
-  SequenceNumberExtractor<K, V> extractor;
-
-
-  public DeduplicationTransformerSupplier(SequenceNumberExtractor<K, V> extractor)
-  {
-    this.extractor = extractor;
-  }
-
-
-  @Override
-  public ValueTransformerWithKey<K, V, Iterable<V>> get()
-  {
-    return new DeduplicationTransformer<K, V>(extractor);
-  }
-}
index 3585604..af7da70 100644 (file)
@@ -1,11 +1,12 @@
 package de.juplo.demo.kafka.deduplication;
 
 
-import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -35,16 +36,6 @@ public class Deduplicator
     properties.put("default.key.serde", Serdes.StringSerde.class);
     properties.put("default.value.serde", Serdes.StringSerde.class);
 
-    SequenceNumberExtractor<String, String> extractor =
-        new SequenceNumberExtractor<String, String>() {
-          @Override
-          public long extract(
-              String topic, int partition, long offset, Headers headers, String key, String value)
-          {
-            return Long.parseLong(value);
-          }
-        };
-
     StreamsBuilder builder = new StreamsBuilder();
 
     // Create state-store for sequence numbers
@@ -59,7 +50,14 @@ public class Deduplicator
     builder
         .<String, String>stream("input")
         .flatTransformValues(
-            new DeduplicationTransformerSupplier<String, String>(extractor),
+            new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+            {
+              @Override
+              public ValueTransformerWithKey<String, String, Iterable<String>> get()
+              {
+                return new DeduplicationTransformer();
+              }
+            },
             DeduplicationTransformer.STORE)
         .to("output");
 
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java b/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java
deleted file mode 100644 (file)
index 8f74c71..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-package de.juplo.demo.kafka.deduplication;
-
-import org.apache.kafka.common.header.Headers;
-
-
-public interface SequenceNumberExtractor<K,V>
-{
-  /**
-   * Extracts a sequence number from the given value.
-   *
-   * The sequence number must be represented as a {@link Long} value.
-   *
-   * @param topic The topic, the message was issued on
-   * @param partition The partition, the message was written to
-   * @param offset The offset of the message in the partition
-   * @param key The key of the message
-   * @param value The value of the message
-   * @return a unique ID
-   */
-  public long extract(String topic, int partition, long offset, Headers headers, K key, V value);
-}