popular: 1.2.0 - Refined `WindowedWord` (timestamp as string)
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
22
23 import java.util.Map;
24
25 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
26 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.*;
27
28
29 @Slf4j
30 public class PopularStreamProcessorTopologyTest
31 {
32   public static final String IN = "TEST-IN";
33   public static final String OUT = "TEST-OUT";
34
35
36   static TopologyTestDriver testDriver;
37   static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
38
39
40   @BeforeAll
41   public static void setUpTestDriver()
42   {
43     Topology topology = PopularStreamProcessor.buildTopology(
44         IN,
45         OUT,
46         Stores.inMemoryWindowStore(
47             WINDOW_STORE_NAME,
48             WINDOW_SIZE.multipliedBy(2),
49             WINDOW_SIZE,
50             false),
51         Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
52
53     testDriver = new TopologyTestDriver(topology, serializationConfig());
54
55     TestInputTopic<InputUser, InputWord> in =
56         testDriver.createInputTopic(IN, serializer(), serializer());
57     TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
58         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
59
60     TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
61
62     receivedMessages = new LinkedMultiValueMap<>();
63     out
64         .readRecordsToList()
65         .forEach(record -> receivedMessages.add(record.key(), record.value()));
66   }
67
68
69   @DisplayName("Assert the expected output messages")
70   @Test
71   public void testExpectedMessages()
72   {
73     TestData.assertExpectedMessages(receivedMessages);
74   }
75
76   @DisplayName("Assert the expected number of messages")
77   @Test
78   public void testExpectedNumberOfMessagesForWord()
79   {
80     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
81   }
82
83   @DisplayName("Await the expected final output messages")
84   @Test
85   public void testExpectedLastMessagesForWord()
86   {
87     TestData.assertExpectedLastMessagesForWord(receivedMessages);
88   }
89
90   @DisplayName("Assert the expected state in the state-store")
91   @Test
92   public void testExpectedState()
93   {
94     KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
95     TestData.assertExpectedState(store);
96   }
97
98   @AfterAll
99   public static void tearDown()
100   {
101     testDriver.close();
102   }
103
104
105   private static JsonSerializer serializer()
106   {
107     return new JsonSerializer().noTypeInfo();
108   }
109
110   private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
111   {
112     return deserializer(true);
113   }
114
115   private static JsonDeserializer<OutputWordCounter> valueDeserializer()
116   {
117     return deserializer(false);
118   }
119
120   private static <T> JsonDeserializer<T> deserializer(boolean isKey)
121   {
122     JsonDeserializer<T> deserializer = new JsonDeserializer<>();
123     deserializer.configure(
124         Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
125         isKey);
126     return deserializer;
127   }
128
129   private static String typeMappingsConfig()
130   {
131     return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);
132   }
133 }