Implemented an integration-test for the transformer
authorKai Moritz <kai@juplo.de>
Fri, 9 Oct 2020 12:59:42 +0000 (14:59 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Oct 2020 19:32:47 +0000 (21:32 +0200)
pom.xml
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 6be4c4b..210b8aa 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-streams-test-utils</artifactId>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 
 
   </dependencies>
 
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                       </plugin>
     </plugins>
   </build>
 
     </plugins>
   </build>
 
diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
new file mode 100644 (file)
index 0000000..bf2fc2d
--- /dev/null
@@ -0,0 +1,112 @@
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.util.Iterator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@ExtendWith(SpringExtension.class)
+public class DeduplicationTransformerIT
+{
+  @Test
+  public void test()
+  {
+    DeduplicationTransformer transformer = new DeduplicationTransformer();
+    MockProcessorContext context = new MockProcessorContext();
+    KeyValueStore<Integer, Long> store =
+        Stores
+            .keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE),
+                Serdes.Integer(),
+                Serdes.Long())
+            .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
+            .build();
+    store.init(context, store);
+    context.register(store, null);
+    transformer.init(context);
+    context.setTopic("foo");
+
+    Iterator<String> transformed;
+
+    context.setPartition(0);
+    context.setOffset(1);
+    transformed = transformer.transform("1", "1").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("1");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(1l);
+
+    context.setPartition(1);
+    context.setOffset(1);
+    transformed = transformer.transform("2", "2").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("2");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(1l);
+    assertThat(store.get(1)).isEqualTo(2l);
+
+    context.setPartition(0);
+    context.setOffset(2);
+    transformed = transformer.transform("1", "1").iterator();
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(1l);
+    assertThat(store.get(1)).isEqualTo(2l);
+
+    context.setPartition(0);
+    context.setOffset(3);
+    transformed = transformer.transform("1", "4").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("4");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+
+    // The order is only guaranteed per partition!
+    context.setPartition(2);
+    context.setOffset(1);
+    transformed = transformer.transform("3", "3").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("3");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+    assertThat(store.get(2)).isEqualTo(3l);
+
+    context.setPartition(1);
+    context.setOffset(2);
+    transformed = transformer.transform("2", "2").iterator();
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+    assertThat(store.get(2)).isEqualTo(3l);
+
+    context.setPartition(2);
+    context.setOffset(2);
+    transformed = transformer.transform("3", "5").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("5");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(2l);
+    assertThat(store.get(2)).isEqualTo(5l);
+
+    // The order is only guaranteed per partition!
+    context.setPartition(1);
+    context.setOffset(3);
+    transformed = transformer.transform("2", "6").iterator();
+    assertThat(transformed.hasNext()).isTrue();
+    assertThat(transformed.next()).isEqualTo("6");
+    assertThat(transformed.hasNext()).isFalse();
+    assertThat(store.get(0)).isEqualTo(4l);
+    assertThat(store.get(1)).isEqualTo(6l);
+    assertThat(store.get(2)).isEqualTo(5l);
+  }
+}