X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicationTransformerIT.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicationTransformerIT.java;h=bf2fc2dd939e82b624ee0e2777e081c05f5565d1;hp=0000000000000000000000000000000000000000;hb=1c19697420360e4fe2e6ab2386ef2d67133711b0;hpb=1fc026384b7699a18323b3ca8106bea86f173349 diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java new file mode 100644 index 0000000..bf2fc2d --- /dev/null +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java @@ -0,0 +1,112 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.Iterator; + +import static org.assertj.core.api.Assertions.assertThat; + + +@ExtendWith(SpringExtension.class) +public class DeduplicationTransformerIT +{ + @Test + public void test() + { + DeduplicationTransformer transformer = new DeduplicationTransformer(); + MockProcessorContext context = new MockProcessorContext(); + KeyValueStore store = + Stores + .keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE), + Serdes.Integer(), + Serdes.Long()) + .withLoggingDisabled() // Changelog is not supported by MockProcessorContext. + .build(); + store.init(context, store); + context.register(store, null); + transformer.init(context); + context.setTopic("foo"); + + Iterator transformed; + + context.setPartition(0); + context.setOffset(1); + transformed = transformer.transform("1", "1").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("1"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(1l); + + context.setPartition(1); + context.setOffset(1); + transformed = transformer.transform("2", "2").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("2"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(1l); + assertThat(store.get(1)).isEqualTo(2l); + + context.setPartition(0); + context.setOffset(2); + transformed = transformer.transform("1", "1").iterator(); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(1l); + assertThat(store.get(1)).isEqualTo(2l); + + context.setPartition(0); + context.setOffset(3); + transformed = transformer.transform("1", "4").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("4"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + + // The order is only guaranteed per partition! + context.setPartition(2); + context.setOffset(1); + transformed = transformer.transform("3", "3").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("3"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + assertThat(store.get(2)).isEqualTo(3l); + + context.setPartition(1); + context.setOffset(2); + transformed = transformer.transform("2", "2").iterator(); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + assertThat(store.get(2)).isEqualTo(3l); + + context.setPartition(2); + context.setOffset(2); + transformed = transformer.transform("3", "5").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("5"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(2l); + assertThat(store.get(2)).isEqualTo(5l); + + // The order is only guaranteed per partition! + context.setPartition(1); + context.setOffset(3); + transformed = transformer.transform("2", "6").iterator(); + assertThat(transformed.hasNext()).isTrue(); + assertThat(transformed.next()).isEqualTo("6"); + assertThat(transformed.hasNext()).isFalse(); + assertThat(store.get(0)).isEqualTo(4l); + assertThat(store.get(1)).isEqualTo(6l); + assertThat(store.get(2)).isEqualTo(5l); + } +}