Implemented an unit-test for the transformer
[demos/kafka/deduplication] / src / test / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformerTest.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import org.apache.kafka.streams.processor.ProcessorContext;
4 import org.apache.kafka.streams.state.KeyValueStore;
5 import org.junit.jupiter.api.BeforeEach;
6 import org.junit.jupiter.api.Test;
7 import org.junit.jupiter.api.extension.ExtendWith;
8 import org.junit.jupiter.params.ParameterizedTest;
9 import org.junit.jupiter.params.provider.ValueSource;
10 import org.springframework.boot.test.mock.mockito.MockBean;
11 import org.springframework.test.context.junit.jupiter.SpringExtension;
12
13 import java.util.Iterator;
14
15 import static org.assertj.core.api.Assertions.assertThat;
16 import static org.mockito.ArgumentMatchers.anyInt;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.*;
19
20
21 @ExtendWith(SpringExtension.class)
22 public class DeduplicationTransformerTest
23 {
24   @MockBean
25   ProcessorContext context;
26   @MockBean
27   KeyValueStore<Integer, Long> store;
28
29   DeduplicationTransformer transformer = new DeduplicationTransformer();
30
31
32   @BeforeEach
33   public void setUpTransformer()
34   {
35     when(context.getStateStore(DeduplicationTransformer.STORE)).thenReturn(store);
36     transformer.init(context);
37   }
38
39
40   @Test
41   public void testStoresSequenceNumberAndForwardesValueIfStoreIsEmpty()
42   {
43     when(store.get(anyInt())).thenReturn(null);
44     when(context.partition()).thenReturn(0);
45
46     Iterator<String> result = transformer.transform("foo", "1").iterator();
47
48     assertThat(result.hasNext()).isTrue();
49     assertThat(result.next()).isEqualTo("1");
50     assertThat(result.hasNext()).isFalse();
51     verify(store, atLeastOnce()).put(eq(0), eq(1l));
52   }
53
54   @ParameterizedTest
55   @ValueSource(longs = { 2, 3, 4, 5, 6, 7 })
56   public void testStoresSequenceNumberAndForwardesValueIfSequenceNumberIsGreater(long sequenceNumber)
57   {
58     String value = Long.toString(sequenceNumber);
59
60     when(store.get(anyInt())).thenReturn(1l);
61     when(context.partition()).thenReturn(0);
62
63     Iterator<String> result = transformer.transform("foo", value).iterator();
64
65     assertThat(result.hasNext()).isTrue();
66     assertThat(result.next()).isEqualTo(value);
67     assertThat(result.hasNext()).isFalse();
68     verify(store, atLeastOnce()).put(eq(0), eq(sequenceNumber));
69   }
70
71   @ParameterizedTest
72   @ValueSource(longs = { 1, 2, 3, 4, 5, 6, 7 })
73   public void testDropsValueIfSequenceNumberIsNotGreater(long sequenceNumber)
74   {
75     String value = Long.toString(sequenceNumber);
76
77     when(store.get(anyInt())).thenReturn(7l);
78     when(context.partition()).thenReturn(0);
79
80     Iterator<String> result = transformer.transform("foo", value).iterator();
81
82     assertThat(result.hasNext()).isFalse();
83     verify(store, never()).put(eq(0), eq(sequenceNumber));
84   }
85 }