From 150907d53cc99a98f4c888eacff892380ffd0feb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Oct 2020 20:37:57 +0200 Subject: [PATCH] Further simplified the example: Knowledge about the key is not required --- .../deduplication/DeduplicationTransformer.java | 12 ++++-------- .../demo/kafka/deduplication/Deduplicator.java | 8 ++++---- .../DeduplicationTransformerIT.java | 16 ++++++++-------- .../DeduplicationTransformerTest.java | 6 +++--- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java index 3bc92d1..dc888bc 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java @@ -2,7 +2,7 @@ package de.juplo.demo.kafka.deduplication; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; @@ -11,7 +11,7 @@ import java.util.Collections; @Slf4j -public class DeduplicationTransformer implements ValueTransformerWithKey> +public class DeduplicationTransformer implements ValueTransformer> { public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE"; private ProcessorContext context; @@ -26,7 +26,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey transform(String key, String value) + public Iterable transform(String value) { String topic = context.topic(); Integer partition = context.partition(); @@ -42,11 +42,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKeystream("input") .flatTransformValues( - new ValueTransformerWithKeySupplier>() + new ValueTransformerSupplier>() { @Override - public ValueTransformerWithKey> get() + public ValueTransformer> get() { return new DeduplicationTransformer(); } diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java index f88f371..339a301 100644 --- a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java @@ -38,14 +38,14 @@ public class DeduplicationTransformerIT Iterator transformed; context.setPartition(0); - transformed = transformer.transform("1", "1").iterator(); + transformed = transformer.transform("1").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("1"); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(1l); context.setPartition(1); - transformed = transformer.transform("2", "2").iterator(); + transformed = transformer.transform("2").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("2"); assertThat(transformed.hasNext()).isFalse(); @@ -53,13 +53,13 @@ public class DeduplicationTransformerIT assertThat(store.get(1)).isEqualTo(2l); context.setPartition(0); - transformed = transformer.transform("1", "1").iterator(); + transformed = transformer.transform("1").iterator(); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(1l); assertThat(store.get(1)).isEqualTo(2l); context.setPartition(0); - transformed = transformer.transform("1", "4").iterator(); + transformed = transformer.transform("4").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("4"); assertThat(transformed.hasNext()).isFalse(); @@ -68,7 +68,7 @@ public class DeduplicationTransformerIT // The order is only guaranteed per partition! context.setPartition(2); - transformed = transformer.transform("3", "3").iterator(); + transformed = transformer.transform("3").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("3"); assertThat(transformed.hasNext()).isFalse(); @@ -77,14 +77,14 @@ public class DeduplicationTransformerIT assertThat(store.get(2)).isEqualTo(3l); context.setPartition(1); - transformed = transformer.transform("2", "2").iterator(); + transformed = transformer.transform("2").iterator(); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(4l); assertThat(store.get(1)).isEqualTo(2l); assertThat(store.get(2)).isEqualTo(3l); context.setPartition(2); - transformed = transformer.transform("3", "5").iterator(); + transformed = transformer.transform("5").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("5"); assertThat(transformed.hasNext()).isFalse(); @@ -94,7 +94,7 @@ public class DeduplicationTransformerIT // The order is only guaranteed per partition! context.setPartition(1); - transformed = transformer.transform("2", "6").iterator(); + transformed = transformer.transform("6").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("6"); assertThat(transformed.hasNext()).isFalse(); diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java index 59f6322..8862c03 100644 --- a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java @@ -43,7 +43,7 @@ public class DeduplicationTransformerTest when(store.get(anyInt())).thenReturn(null); when(context.partition()).thenReturn(0); - Iterator result = transformer.transform("foo", "1").iterator(); + Iterator result = transformer.transform("1").iterator(); assertThat(result.hasNext()).isTrue(); assertThat(result.next()).isEqualTo("1"); @@ -60,7 +60,7 @@ public class DeduplicationTransformerTest when(store.get(anyInt())).thenReturn(1l); when(context.partition()).thenReturn(0); - Iterator result = transformer.transform("foo", value).iterator(); + Iterator result = transformer.transform(value).iterator(); assertThat(result.hasNext()).isTrue(); assertThat(result.next()).isEqualTo(value); @@ -77,7 +77,7 @@ public class DeduplicationTransformerTest when(store.get(anyInt())).thenReturn(7l); when(context.partition()).thenReturn(0); - Iterator result = transformer.transform("foo", value).iterator(); + Iterator result = transformer.transform(value).iterator(); assertThat(result.hasNext()).isFalse(); verify(store, never()).put(eq(0), eq(sequenceNumber)); -- 2.20.1