Further simplified the example: Knowledge about the key is not required
authorKai Moritz <kai@juplo.de>
Sat, 10 Oct 2020 18:37:57 +0000 (20:37 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Oct 2020 19:35:42 +0000 (21:35 +0200)
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java

index 3bc92d1..dc888bc 100644 (file)
@@ -2,7 +2,7 @@ package de.juplo.demo.kafka.deduplication;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -11,7 +11,7 @@ import java.util.Collections;
 
 
 @Slf4j
-public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
+public class DeduplicationTransformer implements ValueTransformer<String, Iterable<String>>
 {
   public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
   private ProcessorContext context;
@@ -26,7 +26,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey<String,
   }
 
   @Override
-  public Iterable<String> transform(String key, String value)
+  public Iterable<String> transform(String value)
   {
     String topic = context.topic();
     Integer partition = context.partition();
@@ -42,11 +42,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey<String,
       return Arrays.asList(value);
     }
 
-    log.info(
-        "ignoring message for key {} with sequence-number {} <= {}",
-        key,
-        sequenceNumber,
-        seen);
+    log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
 
     // Signal, that the message has already been seen.
     // Downstream has to filter the null-values...
index 04a2d82..38ec22a 100644 (file)
@@ -4,8 +4,8 @@ package de.juplo.demo.kafka.deduplication;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.state.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,10 +79,10 @@ public class Deduplicator
     builder
         .<String, String>stream("input")
         .flatTransformValues(
-            new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+            new ValueTransformerSupplier<String, Iterable<String>>()
             {
               @Override
-              public ValueTransformerWithKey<String, String, Iterable<String>> get()
+              public ValueTransformer<String, Iterable<String>> get()
               {
                 return new DeduplicationTransformer();
               }
index f88f371..339a301 100644 (file)
@@ -38,14 +38,14 @@ public class DeduplicationTransformerIT
     Iterator<String> transformed;
 
     context.setPartition(0);
-    transformed = transformer.transform("1", "1").iterator();
+    transformed = transformer.transform("1").iterator();
     assertThat(transformed.hasNext()).isTrue();
     assertThat(transformed.next()).isEqualTo("1");
     assertThat(transformed.hasNext()).isFalse();
     assertThat(store.get(0)).isEqualTo(1l);
 
     context.setPartition(1);
-    transformed = transformer.transform("2", "2").iterator();
+    transformed = transformer.transform("2").iterator();
     assertThat(transformed.hasNext()).isTrue();
     assertThat(transformed.next()).isEqualTo("2");
     assertThat(transformed.hasNext()).isFalse();
@@ -53,13 +53,13 @@ public class DeduplicationTransformerIT
     assertThat(store.get(1)).isEqualTo(2l);
 
     context.setPartition(0);
-    transformed = transformer.transform("1", "1").iterator();
+    transformed = transformer.transform("1").iterator();
     assertThat(transformed.hasNext()).isFalse();
     assertThat(store.get(0)).isEqualTo(1l);
     assertThat(store.get(1)).isEqualTo(2l);
 
     context.setPartition(0);
-    transformed = transformer.transform("1", "4").iterator();
+    transformed = transformer.transform("4").iterator();
     assertThat(transformed.hasNext()).isTrue();
     assertThat(transformed.next()).isEqualTo("4");
     assertThat(transformed.hasNext()).isFalse();
@@ -68,7 +68,7 @@ public class DeduplicationTransformerIT
 
     // The order is only guaranteed per partition!
     context.setPartition(2);
-    transformed = transformer.transform("3", "3").iterator();
+    transformed = transformer.transform("3").iterator();
     assertThat(transformed.hasNext()).isTrue();
     assertThat(transformed.next()).isEqualTo("3");
     assertThat(transformed.hasNext()).isFalse();
@@ -77,14 +77,14 @@ public class DeduplicationTransformerIT
     assertThat(store.get(2)).isEqualTo(3l);
 
     context.setPartition(1);
-    transformed = transformer.transform("2", "2").iterator();
+    transformed = transformer.transform("2").iterator();
     assertThat(transformed.hasNext()).isFalse();
     assertThat(store.get(0)).isEqualTo(4l);
     assertThat(store.get(1)).isEqualTo(2l);
     assertThat(store.get(2)).isEqualTo(3l);
 
     context.setPartition(2);
-    transformed = transformer.transform("3", "5").iterator();
+    transformed = transformer.transform("5").iterator();
     assertThat(transformed.hasNext()).isTrue();
     assertThat(transformed.next()).isEqualTo("5");
     assertThat(transformed.hasNext()).isFalse();
@@ -94,7 +94,7 @@ public class DeduplicationTransformerIT
 
     // The order is only guaranteed per partition!
     context.setPartition(1);
-    transformed = transformer.transform("2", "6").iterator();
+    transformed = transformer.transform("6").iterator();
     assertThat(transformed.hasNext()).isTrue();
     assertThat(transformed.next()).isEqualTo("6");
     assertThat(transformed.hasNext()).isFalse();
index 59f6322..8862c03 100644 (file)
@@ -43,7 +43,7 @@ public class DeduplicationTransformerTest
     when(store.get(anyInt())).thenReturn(null);
     when(context.partition()).thenReturn(0);
 
-    Iterator<String> result = transformer.transform("foo", "1").iterator();
+    Iterator<String> result = transformer.transform("1").iterator();
 
     assertThat(result.hasNext()).isTrue();
     assertThat(result.next()).isEqualTo("1");
@@ -60,7 +60,7 @@ public class DeduplicationTransformerTest
     when(store.get(anyInt())).thenReturn(1l);
     when(context.partition()).thenReturn(0);
 
-    Iterator<String> result = transformer.transform("foo", value).iterator();
+    Iterator<String> result = transformer.transform(value).iterator();
 
     assertThat(result.hasNext()).isTrue();
     assertThat(result.next()).isEqualTo(value);
@@ -77,7 +77,7 @@ public class DeduplicationTransformerTest
     when(store.get(anyInt())).thenReturn(7l);
     when(context.partition()).thenReturn(0);
 
-    Iterator<String> result = transformer.transform("foo", value).iterator();
+    Iterator<String> result = transformer.transform(value).iterator();
 
     assertThat(result.hasNext()).isFalse();
     verify(store, never()).put(eq(0), eq(sequenceNumber));