Implemented an integration-test for the transformer
[demos/kafka/deduplication] / src / test / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformerIT.java
diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
new file mode 100644 (file)
index 0000000..bf2fc2d
--- /dev/null
@@ -0,0 +1,112 @@
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.util.Iterator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@ExtendWith(SpringExtension.class)
+public class DeduplicationTransformerIT
+{
+  @Test
+  public void test()
+  {
+    DeduplicationTransformer transformer = new DeduplicationTransformer();
+    MockProcessorContext context = new MockProcessorContext();
+    KeyValueStore<Integer, Long> store =
+        Stores
+            .keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE),
+                Serdes.Integer(),
+                Serdes.Long())
+            .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
+            .build();
+    store.init(context, store);
+    context.register(store, null);
+    transformer.init(context);
+    context.setTopic("foo");
+
+    Iterator<String> transformed;
+
+    context.setPartition(0);
+    context.setOffset(1);
+    transformed = transformer.transform("1", "1").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("1");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(1l);
+
+    context.setPartition(1);
+    context.setOffset(1);
+    transformed = transformer.transform("2", "2").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("2");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(1l);
+    assertThat(store.get(1)).isEqualTo(2l);
+
+    context.setPartition(0);
+    context.setOffset(2);
+    transformed = transformer.transform("1", "1").iterator();
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(1l);
+    assertThat(store.get(1)).isEqualTo(2l);
+
+    context.setPartition(0);
+    context.setOffset(3);
+    transformed = transformer.transform("1", "4").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("4");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+
+    // The order is only guaranteed per partition!
+    context.setPartition(2);
+    context.setOffset(1);
+    transformed = transformer.transform("3", "3").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("3");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+    assertThat(store.get(2)).isEqualTo(3l);
+
+    context.setPartition(1);
+    context.setOffset(2);
+    transformed = transformer.transform("2", "2").iterator();
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+    assertThat(store.get(2)).isEqualTo(3l);
+
+    context.setPartition(2);
+    context.setOffset(2);
+    transformed = transformer.transform("3", "5").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("5");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+    assertThat(store.get(2)).isEqualTo(5l);
+
+    // The order is only guaranteed per partition!
+    context.setPartition(1);
+    context.setOffset(3);
+    transformed = transformer.transform("2", "6").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("6");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(6l);
+    assertThat(store.get(2)).isEqualTo(5l);
+  }
+}