From: Kai Moritz Date: Fri, 9 Oct 2020 15:56:09 +0000 (+0200) Subject: Implemented an unit-test for the transformer X-Git-Tag: streams-deduplicator-1.0.0~9 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ac16b72a27bf9c16dee82558f1f47f6d0f0838bf;p=demos%2Fkafka%2Fdeduplication Implemented an unit-test for the transformer --- diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java new file mode 100644 index 0000000..59f6322 --- /dev/null +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java @@ -0,0 +1,85 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.Iterator; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + + +@ExtendWith(SpringExtension.class) +public class DeduplicationTransformerTest +{ + @MockBean + ProcessorContext context; + @MockBean + KeyValueStore store; + + DeduplicationTransformer transformer = new DeduplicationTransformer(); + + + @BeforeEach + public void setUpTransformer() + { + when(context.getStateStore(DeduplicationTransformer.STORE)).thenReturn(store); + transformer.init(context); + } + + + @Test + public void testStoresSequenceNumberAndForwardesValueIfStoreIsEmpty() + { + when(store.get(anyInt())).thenReturn(null); + when(context.partition()).thenReturn(0); + + Iterator result = transformer.transform("foo", "1").iterator(); + + assertThat(result.hasNext()).isTrue(); + assertThat(result.next()).isEqualTo("1"); + assertThat(result.hasNext()).isFalse(); + verify(store, atLeastOnce()).put(eq(0), eq(1l)); + } + + @ParameterizedTest + @ValueSource(longs = { 2, 3, 4, 5, 6, 7 }) + public void testStoresSequenceNumberAndForwardesValueIfSequenceNumberIsGreater(long sequenceNumber) + { + String value = Long.toString(sequenceNumber); + + when(store.get(anyInt())).thenReturn(1l); + when(context.partition()).thenReturn(0); + + Iterator result = transformer.transform("foo", value).iterator(); + + assertThat(result.hasNext()).isTrue(); + assertThat(result.next()).isEqualTo(value); + assertThat(result.hasNext()).isFalse(); + verify(store, atLeastOnce()).put(eq(0), eq(sequenceNumber)); + } + + @ParameterizedTest + @ValueSource(longs = { 1, 2, 3, 4, 5, 6, 7 }) + public void testDropsValueIfSequenceNumberIsNotGreater(long sequenceNumber) + { + String value = Long.toString(sequenceNumber); + + when(store.get(anyInt())).thenReturn(7l); + when(context.partition()).thenReturn(0); + + Iterator result = transformer.transform("foo", value).iterator(); + + assertThat(result.hasNext()).isFalse(); + verify(store, never()).put(eq(0), eq(sequenceNumber)); + } +}