1 package de.juplo.demo.kafka.deduplication;
3 import org.apache.kafka.common.serialization.Serdes;
4 import org.apache.kafka.streams.processor.MockProcessorContext;
5 import org.apache.kafka.streams.state.KeyValueStore;
6 import org.apache.kafka.streams.state.Stores;
7 import org.junit.jupiter.api.Test;
8 import org.junit.jupiter.api.extension.ExtendWith;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
11 import java.util.Iterator;
13 import static org.assertj.core.api.Assertions.assertThat;
16 @ExtendWith(SpringExtension.class)
17 public class DeduplicationTransformerIT
22 DeduplicationTransformer transformer = new DeduplicationTransformer();
23 MockProcessorContext context = new MockProcessorContext();
24 KeyValueStore<Integer, Long> store =
26 .keyValueStoreBuilder(
27 Stores.inMemoryKeyValueStore(DeduplicationTransformer.STORE),
30 .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
32 store.init(context, store);
33 context.register(store, null);
34 transformer.init(context);
35 context.setTopic("foo");
37 Iterator<String> transformed;
39 context.setPartition(0);
41 transformed = transformer.transform("1", "1").iterator();
42 assertThat(transformed.hasNext()).isTrue();
43 assertThat(transformed.next()).isEqualTo("1");
44 assertThat(transformed.hasNext()).isFalse();
45 assertThat(store.get(0)).isEqualTo(1l);
47 context.setPartition(1);
49 transformed = transformer.transform("2", "2").iterator();
50 assertThat(transformed.hasNext()).isTrue();
51 assertThat(transformed.next()).isEqualTo("2");
52 assertThat(transformed.hasNext()).isFalse();
53 assertThat(store.get(0)).isEqualTo(1l);
54 assertThat(store.get(1)).isEqualTo(2l);
56 context.setPartition(0);
58 transformed = transformer.transform("1", "1").iterator();
59 assertThat(transformed.hasNext()).isFalse();
60 assertThat(store.get(0)).isEqualTo(1l);
61 assertThat(store.get(1)).isEqualTo(2l);
63 context.setPartition(0);
65 transformed = transformer.transform("1", "4").iterator();
66 assertThat(transformed.hasNext()).isTrue();
67 assertThat(transformed.next()).isEqualTo("4");
68 assertThat(transformed.hasNext()).isFalse();
69 assertThat(store.get(0)).isEqualTo(4l);
70 assertThat(store.get(1)).isEqualTo(2l);
72 // The order is only guaranteed per partition!
73 context.setPartition(2);
75 transformed = transformer.transform("3", "3").iterator();
76 assertThat(transformed.hasNext()).isTrue();
77 assertThat(transformed.next()).isEqualTo("3");
78 assertThat(transformed.hasNext()).isFalse();
79 assertThat(store.get(0)).isEqualTo(4l);
80 assertThat(store.get(1)).isEqualTo(2l);
81 assertThat(store.get(2)).isEqualTo(3l);
83 context.setPartition(1);
85 transformed = transformer.transform("2", "2").iterator();
86 assertThat(transformed.hasNext()).isFalse();
87 assertThat(store.get(0)).isEqualTo(4l);
88 assertThat(store.get(1)).isEqualTo(2l);
89 assertThat(store.get(2)).isEqualTo(3l);
91 context.setPartition(2);
93 transformed = transformer.transform("3", "5").iterator();
94 assertThat(transformed.hasNext()).isTrue();
95 assertThat(transformed.next()).isEqualTo("5");
96 assertThat(transformed.hasNext()).isFalse();
97 assertThat(store.get(0)).isEqualTo(4l);
98 assertThat(store.get(1)).isEqualTo(2l);
99 assertThat(store.get(2)).isEqualTo(5l);
101 // The order is only guaranteed per partition!
102 context.setPartition(1);
103 context.setOffset(3);
104 transformed = transformer.transform("2", "6").iterator();
105 assertThat(transformed.hasNext()).isTrue();
106 assertThat(transformed.next()).isEqualTo("6");
107 assertThat(transformed.hasNext()).isFalse();
108 assertThat(store.get(0)).isEqualTo(4l);
109 assertThat(store.get(1)).isEqualTo(6l);
110 assertThat(store.get(2)).isEqualTo(5l);