counter: 1.3.1 - Refined `CounterStreamProcessorTopologyTest`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputUser;
4 import de.juplo.kafka.wordcount.splitter.TestInputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWord;
6 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterEach;
15 import org.junit.jupiter.api.BeforeEach;
16 import org.junit.jupiter.api.Test;
17 import org.springframework.kafka.support.serializer.JsonDeserializer;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.util.LinkedMultiValueMap;
20 import org.springframework.util.MultiValueMap;
21
22 import java.util.Map;
23 import java.util.stream.Collectors;
24
25 import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
26 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
27
28
29 @Slf4j
30 public class CounterStreamProcessorTopologyTest
31 {
32   public static final String IN = "TEST-IN";
33   public static final String OUT = "TEST-OUT";
34
35
36   TopologyTestDriver testDriver;
37   TestInputTopic<TestInputUser, TestInputWord> in;
38   TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
39
40
41   @BeforeEach
42   public void setUpTestDriver()
43   {
44     Topology topology = CounterStreamProcessor.buildTopology(
45         IN,
46         OUT,
47         Stores.inMemoryKeyValueStore(STORE_NAME));
48
49     testDriver = new TopologyTestDriver(topology, serializationConfig());
50
51     in = testDriver.createInputTopic(IN, serializer(), serializer());
52     out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
53   }
54
55
56   @Test
57   public void test()
58   {
59     TestData
60         .getInputMessages()
61         .forEach(kv -> in.pipeInput(kv.key, kv.value));
62
63     MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
64     out
65         .readRecordsToList()
66         .forEach(record -> receivedMessages.add(record.key(), record.value()));
67
68     TestData.assertExpectedMessages(receivedMessages);
69
70     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
71     TestData.assertExpectedLastMessagesForWord(receivedMessages);
72
73     KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
74     TestData.assertExpectedState(store);
75   }
76
77   @AfterEach
78   public void tearDown()
79   {
80     testDriver.close();
81   }
82
83
84   private static JsonSerializer serializer()
85   {
86     return new JsonSerializer().noTypeInfo();
87   }
88
89   private JsonDeserializer<TestOutputWord> keyDeserializer()
90   {
91     return deserializer(true);
92   }
93
94   private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
95   {
96     return deserializer(false);
97   }
98
99   private static <T> JsonDeserializer<T> deserializer(boolean isKey)
100   {
101     JsonDeserializer<T> deserializer = new JsonDeserializer<>();
102     deserializer.configure(
103         Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
104         isKey);
105     return deserializer;
106   }
107
108   private static String typeMappingsConfig()
109   {
110     return typeMappings()
111         .entrySet()
112         .stream()
113         .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
114         .collect(Collectors.joining(","));
115   }
116
117   private static Map<String, Class> typeMappings()
118   {
119     return Map.of(
120         "word", TestOutputWord.class,
121         "counter", TestOutputWordCounter.class);
122   }
123 }