Further simplified the example: Knowledge about the key is not required
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformer.java
index 3bc92d1..dc888bc 100644 (file)
@@ -2,7 +2,7 @@ package de.juplo.demo.kafka.deduplication;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -11,7 +11,7 @@ import java.util.Collections;
 
 
 @Slf4j
-public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
+public class DeduplicationTransformer implements ValueTransformer<String, Iterable<String>>
 {
   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
   private ProcessorContext context;
@@ -26,7 +26,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey<String,
   }
 
   @Override
-  public Iterable<String> transform(String key, String value)
+  public Iterable<String> transform(String value)
   {
     String topic = context.topic();
     Integer partition = context.partition();
@@ -42,11 +42,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey<String,
       return Arrays.asList(value);
     }
 
-    log.info(
-        "ignoring message for key {} with sequence-number {} <= {}",
-        key,
-        sequenceNumber,
-        seen);
+    log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
 
     // Signal, that the message has already been seen.
     // Downstream has to filter the null-values...