Simplified example
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformer.java
index e07fb37..3bc92d1 100644 (file)
@@ -11,21 +11,13 @@ import java.util.Collections;
 
 
 @Slf4j
-public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
+public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
 {
-  final SequenceNumberExtractor<K, V> extractor;
-
   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
   private ProcessorContext context;
   private KeyValueStore<Integer, Long> store;
 
 
-  public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
-  {
-    this.extractor = extractor;
-  }
-
-
   @Override
   public void init(ProcessorContext context)
   {
@@ -34,14 +26,14 @@ public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K
   }
 
   @Override
-  public Iterable<V> transform(K key, V value)
+  public Iterable<String> transform(String key, String value)
   {
     String topic = context.topic();
     Integer partition = context.partition();
     long offset = context.offset();
     Headers headers = context.headers();
 
-    long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
+    long sequenceNumber = Long.parseLong(value);
 
     Long seen = store.get(partition);
     if (seen == null || seen < sequenceNumber)
@@ -51,12 +43,10 @@ public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K
     }
 
     log.info(
-        "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
+        "ignoring message for key {} with sequence-number {} <= {}",
         key,
         sequenceNumber,
-        seen,
-        offset,
-        partition);
+        seen);
 
     // Signal, that the message has already been seen.
     // Downstream has to filter the null-values...