1 package de.juplo.demo.kafka.deduplication;
3 import org.apache.kafka.streams.processor.ProcessorContext;
4 import org.apache.kafka.streams.state.KeyValueStore;
5 import org.junit.jupiter.api.BeforeEach;
6 import org.junit.jupiter.api.Test;
7 import org.junit.jupiter.api.extension.ExtendWith;
8 import org.junit.jupiter.params.ParameterizedTest;
9 import org.junit.jupiter.params.provider.ValueSource;
10 import org.springframework.boot.test.mock.mockito.MockBean;
11 import org.springframework.test.context.junit.jupiter.SpringExtension;
13 import java.util.Iterator;
15 import static org.assertj.core.api.Assertions.assertThat;
16 import static org.mockito.ArgumentMatchers.anyInt;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.*;
21 @ExtendWith(SpringExtension.class)
22 public class DeduplicationTransformerTest
25 ProcessorContext context;
27 KeyValueStore<Integer, Long> store;
29 DeduplicationTransformer transformer = new DeduplicationTransformer();
33 public void setUpTransformer()
35 when(context.getStateStore(DeduplicationTransformer.STORE)).thenReturn(store);
36 transformer.init(context);
41 public void testStoresSequenceNumberAndForwardesValueIfStoreIsEmpty()
43 when(store.get(anyInt())).thenReturn(null);
44 when(context.partition()).thenReturn(0);
46 Iterator<String> result = transformer.transform("1").iterator();
48 assertThat(result.hasNext()).isTrue();
49 assertThat(result.next()).isEqualTo("1");
50 assertThat(result.hasNext()).isFalse();
51 verify(store, atLeastOnce()).put(eq(0), eq(1l));
55 @ValueSource(longs = { 2, 3, 4, 5, 6, 7 })
56 public void testStoresSequenceNumberAndForwardesValueIfSequenceNumberIsGreater(long sequenceNumber)
58 String value = Long.toString(sequenceNumber);
60 when(store.get(anyInt())).thenReturn(1l);
61 when(context.partition()).thenReturn(0);
63 Iterator<String> result = transformer.transform(value).iterator();
65 assertThat(result.hasNext()).isTrue();
66 assertThat(result.next()).isEqualTo(value);
67 assertThat(result.hasNext()).isFalse();
68 verify(store, atLeastOnce()).put(eq(0), eq(sequenceNumber));
72 @ValueSource(longs = { 1, 2, 3, 4, 5, 6, 7 })
73 public void testDropsValueIfSequenceNumberIsNotGreater(long sequenceNumber)
75 String value = Long.toString(sequenceNumber);
77 when(store.get(anyInt())).thenReturn(7l);
78 when(context.partition()).thenReturn(0);
80 Iterator<String> result = transformer.transform(value).iterator();
82 assertThat(result.hasNext()).isFalse();
83 verify(store, never()).put(eq(0), eq(sequenceNumber));