From: Kai Moritz Date: Fri, 9 Oct 2020 12:59:42 +0000 (+0200) Subject: Implemented an integration-test for the transformer X-Git-Tag: streams-deduplicator-1.0.0~11 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=17baf236aabdddc2fcb7d74d5304a293b9e12fc9;p=demos%2Fkafka%2Fdeduplication Implemented an integration-test for the transformer --- diff --git a/pom.xml b/pom.xml index 97c8f2d..bcb083b 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,11 @@ spring-boot-starter-test test + + org.apache.kafka + kafka-streams-test-utils + test + @@ -46,6 +51,10 @@ org.springframework.boot spring-boot-maven-plugin + + org.apache.maven.plugins + maven-failsafe-plugin + diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java new file mode 100644 index 0000000..bf2fc2d --- /dev/null +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java @@ -0,0 +1,112 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.Iterator; + +import static org.assertj.core.api.Assertions.assertThat; + + +@ExtendWith(SpringExtension.class) +public class DeduplicationTransformerIT +{ + @Test + public void test() + { + DeduplicationTransformer transformer = new DeduplicationTransformer(); + MockProcessorContext context = new MockProcessorContext(); + KeyValueStore store = + Stores + .keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE), + Serdes.Integer(), + Serdes.Long()) + .withLoggingDisabled() // Changelog is not supported by MockProcessorContext. + .build(); + store.init(context, store); + context.register(store, null); + transformer.init(context); + context.setTopic("foo"); + + Iterator transformed; + + context.setPartition(0); + context.setOffset(1); + transformed = transformer.transform("1", "1").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("1"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(1l); + + context.setPartition(1); + context.setOffset(1); + transformed = transformer.transform("2", "2").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("2"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(1l); + assertThat(store.get(1)).isEqualTo(2l); + + context.setPartition(0); + context.setOffset(2); + transformed = transformer.transform("1", "1").iterator(); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(1l); + assertThat(store.get(1)).isEqualTo(2l); + + context.setPartition(0); + context.setOffset(3); + transformed = transformer.transform("1", "4").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("4"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + + // The order is only guaranteed per partition! + context.setPartition(2); + context.setOffset(1); + transformed = transformer.transform("3", "3").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("3"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + assertThat(store.get(2)).isEqualTo(3l); + + context.setPartition(1); + context.setOffset(2); + transformed = transformer.transform("2", "2").iterator(); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + assertThat(store.get(2)).isEqualTo(3l); + + context.setPartition(2); + context.setOffset(2); + transformed = transformer.transform("3", "5").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("5"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + assertThat(store.get(2)).isEqualTo(5l); + + // The order is only guaranteed per partition! + context.setPartition(1); + context.setOffset(3); + transformed = transformer.transform("2", "6").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("6"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(6l); + assertThat(store.get(2)).isEqualTo(5l); + } +}