339a3010111c9d9043f5b407cfe96a6b05478d56
[demos/kafka/deduplication] / src / test / java / de / juplo / demo / kafka / deduplication / DeduplicationTransformerIT.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import org.apache.kafka.common.serialization.Serdes;
4 import org.apache.kafka.streams.processor.MockProcessorContext;
5 import org.apache.kafka.streams.state.KeyValueStore;
6 import org.apache.kafka.streams.state.Stores;
7 import org.junit.jupiter.api.Test;
8 import org.junit.jupiter.api.extension.ExtendWith;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
10
11 import java.util.Iterator;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 @ExtendWith(SpringExtension.class)
17 public class DeduplicationTransformerIT
18 {
19   @Test
20   public void test()
21   {
22     DeduplicationTransformer transformer = new DeduplicationTransformer();
23     MockProcessorContext context = new MockProcessorContext();
24     KeyValueStore<Integer, Long> store =
25         Stores
26             .keyValueStoreBuilder(
27                 Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE),
28                 Serdes.Integer(),
29                 Serdes.Long())
30             .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
31             .build();
32     store.init(context, store);
33     context.register(store, null);
34     transformer.init(context);
35     context.setTopic("foo");
36     context.setOffset(1);
37
38     Iterator<String> transformed;
39
40     context.setPartition(0);
41     transformed = transformer.transform("1").iterator();
42     assertThat(transformed.hasNext()).isTrue();
43     assertThat(transformed.next()).isEqualTo("1");
44     assertThat(transformed.hasNext()).isFalse();
45     assertThat(store.get(0)).isEqualTo(1l);
46
47     context.setPartition(1);
48     transformed = transformer.transform("2").iterator();
49     assertThat(transformed.hasNext()).isTrue();
50     assertThat(transformed.next()).isEqualTo("2");
51     assertThat(transformed.hasNext()).isFalse();
52     assertThat(store.get(0)).isEqualTo(1l);
53     assertThat(store.get(1)).isEqualTo(2l);
54
55     context.setPartition(0);
56     transformed = transformer.transform("1").iterator();
57     assertThat(transformed.hasNext()).isFalse();
58     assertThat(store.get(0)).isEqualTo(1l);
59     assertThat(store.get(1)).isEqualTo(2l);
60
61     context.setPartition(0);
62     transformed = transformer.transform("4").iterator();
63     assertThat(transformed.hasNext()).isTrue();
64     assertThat(transformed.next()).isEqualTo("4");
65     assertThat(transformed.hasNext()).isFalse();
66     assertThat(store.get(0)).isEqualTo(4l);
67     assertThat(store.get(1)).isEqualTo(2l);
68
69     // The order is only guaranteed per partition!
70     context.setPartition(2);
71     transformed = transformer.transform("3").iterator();
72     assertThat(transformed.hasNext()).isTrue();
73     assertThat(transformed.next()).isEqualTo("3");
74     assertThat(transformed.hasNext()).isFalse();
75     assertThat(store.get(0)).isEqualTo(4l);
76     assertThat(store.get(1)).isEqualTo(2l);
77     assertThat(store.get(2)).isEqualTo(3l);
78
79     context.setPartition(1);
80     transformed = transformer.transform("2").iterator();
81     assertThat(transformed.hasNext()).isFalse();
82     assertThat(store.get(0)).isEqualTo(4l);
83     assertThat(store.get(1)).isEqualTo(2l);
84     assertThat(store.get(2)).isEqualTo(3l);
85
86     context.setPartition(2);
87     transformed = transformer.transform("5").iterator();
88     assertThat(transformed.hasNext()).isTrue();
89     assertThat(transformed.next()).isEqualTo("5");
90     assertThat(transformed.hasNext()).isFalse();
91     assertThat(store.get(0)).isEqualTo(4l);
92     assertThat(store.get(1)).isEqualTo(2l);
93     assertThat(store.get(2)).isEqualTo(5l);
94
95     // The order is only guaranteed per partition!
96     context.setPartition(1);
97     transformed = transformer.transform("6").iterator();
98     assertThat(transformed.hasNext()).isTrue();
99     assertThat(transformed.next()).isEqualTo("6");
100     assertThat(transformed.hasNext()).isFalse();
101     assertThat(store.get(0)).isEqualTo(4l);
102     assertThat(store.get(1)).isEqualTo(6l);
103     assertThat(store.get(2)).isEqualTo(5l);
104   }
105 }