Implemented an integration-test for the transformer
[demos/kafka/deduplication] / src / test / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformerIT.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import org.apache.kafka.common.serialization.Serdes;
4 import org.apache.kafka.streams.processor.MockProcessorContext;
5 import org.apache.kafka.streams.state.KeyValueStore;
6 import org.apache.kafka.streams.state.Stores;
7 import org.junit.jupiter.api.Test;
8 import org.junit.jupiter.api.extension.ExtendWith;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
10
11 import java.util.Iterator;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 @ExtendWith(SpringExtension.class)
17 public class DeduplicationTransformerIT
18 {
19   @Test
20   public void test()
21   {
22     DeduplicationTransformer transformer = new DeduplicationTransformer();
23     MockProcessorContext context = new MockProcessorContext();
24     KeyValueStore<Integer, Long> store =
25         Stores
26             .keyValueStoreBuilder(
27                 Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE),
28                 Serdes.Integer(),
29                 Serdes.Long())
30             .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
31             .build();
32     store.init(context, store);
33     context.register(store, null);
34     transformer.init(context);
35     context.setTopic("foo");
36
37     Iterator<String> transformed;
38
39     context.setPartition(0);
40     context.setOffset(1);
41     transformed = transformer.transform("1", "1").iterator();
42     assertThat(transformed.hasNext()).isTrue();
43     assertThat(transformed.next()).isEqualTo("1");
44     assertThat(transformed.hasNext()).isFalse();
45     assertThat(store.get(0)).isEqualTo(1l);
46
47     context.setPartition(1);
48     context.setOffset(1);
49     transformed = transformer.transform("2", "2").iterator();
50     assertThat(transformed.hasNext()).isTrue();
51     assertThat(transformed.next()).isEqualTo("2");
52     assertThat(transformed.hasNext()).isFalse();
53     assertThat(store.get(0)).isEqualTo(1l);
54     assertThat(store.get(1)).isEqualTo(2l);
55
56     context.setPartition(0);
57     context.setOffset(2);
58     transformed = transformer.transform("1", "1").iterator();
59     assertThat(transformed.hasNext()).isFalse();
60     assertThat(store.get(0)).isEqualTo(1l);
61     assertThat(store.get(1)).isEqualTo(2l);
62
63     context.setPartition(0);
64     context.setOffset(3);
65     transformed = transformer.transform("1", "4").iterator();
66     assertThat(transformed.hasNext()).isTrue();
67     assertThat(transformed.next()).isEqualTo("4");
68     assertThat(transformed.hasNext()).isFalse();
69     assertThat(store.get(0)).isEqualTo(4l);
70     assertThat(store.get(1)).isEqualTo(2l);
71
72     // The order is only guaranteed per partition!
73     context.setPartition(2);
74     context.setOffset(1);
75     transformed = transformer.transform("3", "3").iterator();
76     assertThat(transformed.hasNext()).isTrue();
77     assertThat(transformed.next()).isEqualTo("3");
78     assertThat(transformed.hasNext()).isFalse();
79     assertThat(store.get(0)).isEqualTo(4l);
80     assertThat(store.get(1)).isEqualTo(2l);
81     assertThat(store.get(2)).isEqualTo(3l);
82
83     context.setPartition(1);
84     context.setOffset(2);
85     transformed = transformer.transform("2", "2").iterator();
86     assertThat(transformed.hasNext()).isFalse();
87     assertThat(store.get(0)).isEqualTo(4l);
88     assertThat(store.get(1)).isEqualTo(2l);
89     assertThat(store.get(2)).isEqualTo(3l);
90
91     context.setPartition(2);
92     context.setOffset(2);
93     transformed = transformer.transform("3", "5").iterator();
94     assertThat(transformed.hasNext()).isTrue();
95     assertThat(transformed.next()).isEqualTo("5");
96     assertThat(transformed.hasNext()).isFalse();
97     assertThat(store.get(0)).isEqualTo(4l);
98     assertThat(store.get(1)).isEqualTo(2l);
99     assertThat(store.get(2)).isEqualTo(5l);
100
101     // The order is only guaranteed per partition!
102     context.setPartition(1);
103     context.setOffset(3);
104     transformed = transformer.transform("2", "6").iterator();
105     assertThat(transformed.hasNext()).isTrue();
106     assertThat(transformed.next()).isEqualTo("6");
107     assertThat(transformed.hasNext()).isFalse();
108     assertThat(store.get(0)).isEqualTo(4l);
109     assertThat(store.get(1)).isEqualTo(6l);
110     assertThat(store.get(2)).isEqualTo(5l);
111   }
112 }