1 package de.juplo.demo.kafka.deduplication;
3 import org.apache.kafka.common.serialization.Serdes;
4 import org.apache.kafka.streams.processor.MockProcessorContext;
5 import org.apache.kafka.streams.state.KeyValueStore;
6 import org.apache.kafka.streams.state.Stores;
7 import org.junit.jupiter.api.Test;
8 import org.junit.jupiter.api.extension.ExtendWith;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
11 import java.util.Iterator;
13 import static org.assertj.core.api.Assertions.assertThat;
16 @ExtendWith(SpringExtension.class)
17 public class DeduplicationTransformerIT
22 DeduplicationTransformer transformer = new DeduplicationTransformer();
23 MockProcessorContext context = new MockProcessorContext();
24 KeyValueStore<Integer, Long> store =
26 .keyValueStoreBuilder(
27 Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE),
30 .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
32 store.init(context, store);
33 context.register(store, null);
34 transformer.init(context);
35 context.setTopic("foo");
38 Iterator<String> transformed;
40 context.setPartition(0);
41 transformed = transformer.transform("1").iterator();
42 assertThat(transformed.hasNext()).isTrue();
43 assertThat(transformed.next()).isEqualTo("1");
44 assertThat(transformed.hasNext()).isFalse();
45 assertThat(store.get(0)).isEqualTo(1l);
47 context.setPartition(1);
48 transformed = transformer.transform("2").iterator();
49 assertThat(transformed.hasNext()).isTrue();
50 assertThat(transformed.next()).isEqualTo("2");
51 assertThat(transformed.hasNext()).isFalse();
52 assertThat(store.get(0)).isEqualTo(1l);
53 assertThat(store.get(1)).isEqualTo(2l);
55 context.setPartition(0);
56 transformed = transformer.transform("1").iterator();
57 assertThat(transformed.hasNext()).isFalse();
58 assertThat(store.get(0)).isEqualTo(1l);
59 assertThat(store.get(1)).isEqualTo(2l);
61 context.setPartition(0);
62 transformed = transformer.transform("4").iterator();
63 assertThat(transformed.hasNext()).isTrue();
64 assertThat(transformed.next()).isEqualTo("4");
65 assertThat(transformed.hasNext()).isFalse();
66 assertThat(store.get(0)).isEqualTo(4l);
67 assertThat(store.get(1)).isEqualTo(2l);
69 // The order is only guaranteed per partition!
70 context.setPartition(2);
71 transformed = transformer.transform("3").iterator();
72 assertThat(transformed.hasNext()).isTrue();
73 assertThat(transformed.next()).isEqualTo("3");
74 assertThat(transformed.hasNext()).isFalse();
75 assertThat(store.get(0)).isEqualTo(4l);
76 assertThat(store.get(1)).isEqualTo(2l);
77 assertThat(store.get(2)).isEqualTo(3l);
79 context.setPartition(1);
80 transformed = transformer.transform("2").iterator();
81 assertThat(transformed.hasNext()).isFalse();
82 assertThat(store.get(0)).isEqualTo(4l);
83 assertThat(store.get(1)).isEqualTo(2l);
84 assertThat(store.get(2)).isEqualTo(3l);
86 context.setPartition(2);
87 transformed = transformer.transform("5").iterator();
88 assertThat(transformed.hasNext()).isTrue();
89 assertThat(transformed.next()).isEqualTo("5");
90 assertThat(transformed.hasNext()).isFalse();
91 assertThat(store.get(0)).isEqualTo(4l);
92 assertThat(store.get(1)).isEqualTo(2l);
93 assertThat(store.get(2)).isEqualTo(5l);
95 // The order is only guaranteed per partition!
96 context.setPartition(1);
97 transformed = transformer.transform("6").iterator();
98 assertThat(transformed.hasNext()).isTrue();
99 assertThat(transformed.next()).isEqualTo("6");
100 assertThat(transformed.hasNext()).isFalse();
101 assertThat(store.get(0)).isEqualTo(4l);
102 assertThat(store.get(1)).isEqualTo(6l);
103 assertThat(store.get(2)).isEqualTo(5l);