popular: 1.0.0 - Word are counted for hopping time-windows
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
22
23 import java.time.Duration;
24 import java.time.ZoneId;
25 import java.util.Map;
26
27 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
28 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
29 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
30
31
32 @Slf4j
33 public class PopularStreamProcessorTopologyTest
34 {
35   public static final String IN = "TEST-IN";
36   public static final String OUT = "TEST-OUT";
37   public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
38
39
40   static TopologyTestDriver testDriver;
41   static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
42
43
44   @BeforeAll
45   public static void setUpTestDriver()
46   {
47     Topology topology = PopularStreamProcessor.buildTopology(
48         IN,
49         OUT,
50         ZONE,
51         Stores.inMemoryWindowStore(
52             WINDOW_STORE_NAME,
53             Duration.ofSeconds(60),
54             Duration.ofSeconds(30),
55             false),
56         Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
57
58     testDriver = new TopologyTestDriver(topology, serializationConfig());
59
60     TestInputTopic<InputUser, InputWord> in =
61         testDriver.createInputTopic(IN, serializer(), serializer());
62     TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
63         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
64
65     TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
66
67     receivedMessages = new LinkedMultiValueMap<>();
68     out
69         .readRecordsToList()
70         .forEach(record -> receivedMessages.add(record.key(), record.value()));
71   }
72
73
74   @DisplayName("Assert the expected output messages")
75   @Test
76   public void testExpectedMessages()
77   {
78     TestData.assertExpectedMessages(receivedMessages);
79   }
80
81   @DisplayName("Assert the expected number of messages")
82   @Test
83   public void testExpectedNumberOfMessagesForWord()
84   {
85     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
86   }
87
88   @DisplayName("Await the expected final output messages")
89   @Test
90   public void testExpectedLastMessagesForWord()
91   {
92     TestData.assertExpectedLastMessagesForWord(receivedMessages);
93   }
94
95   @DisplayName("Assert the expected state in the state-store")
96   @Test
97   public void testExpectedState()
98   {
99     KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
100     TestData.assertExpectedState(store);
101   }
102
103   @AfterAll
104   public static void tearDown()
105   {
106     testDriver.close();
107   }
108
109
110   private static JsonSerializer serializer()
111   {
112     return new JsonSerializer().noTypeInfo();
113   }
114
115   private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
116   {
117     return deserializer(true);
118   }
119
120   private static JsonDeserializer<OutputWordCounter> valueDeserializer()
121   {
122     return deserializer(false);
123   }
124
125   private static <T> JsonDeserializer<T> deserializer(boolean isKey)
126   {
127     JsonDeserializer<T> deserializer = new JsonDeserializer<>();
128     deserializer.configure(
129         Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
130         isKey);
131     return deserializer;
132   }
133
134   private static String typeMappingsConfig()
135   {
136     return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);
137   }
138 }