23ab5240c38f85948e1cbbf81df40811c283b71c
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
22
23 import java.time.Duration;
24 import java.time.ZoneId;
25 import java.util.Map;
26
27 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
28 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.*;
29
30
31 @Slf4j
32 public class PopularStreamProcessorTopologyTest
33 {
34   public static final String IN = "TEST-IN";
35   public static final String OUT = "TEST-OUT";
36   public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
37
38
39   static TopologyTestDriver testDriver;
40   static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
41
42
43   @BeforeAll
44   public static void setUpTestDriver()
45   {
46     Topology topology = PopularStreamProcessor.buildTopology(
47         IN,
48         OUT,
49         ZONE,
50         Stores.inMemoryWindowStore(
51             WINDOW_STORE_NAME,
52             WINDOW_SIZE.multipliedBy(2),
53             WINDOW_SIZE,
54             false),
55         Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
56
57     testDriver = new TopologyTestDriver(topology, serializationConfig());
58
59     TestInputTopic<InputUser, InputWord> in =
60         testDriver.createInputTopic(IN, serializer(), serializer());
61     TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
62         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
63
64     TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
65
66     receivedMessages = new LinkedMultiValueMap<>();
67     out
68         .readRecordsToList()
69         .forEach(record -> receivedMessages.add(record.key(), record.value()));
70   }
71
72
73   @DisplayName("Assert the expected output messages")
74   @Test
75   public void testExpectedMessages()
76   {
77     TestData.assertExpectedMessages(receivedMessages);
78   }
79
80   @DisplayName("Assert the expected number of messages")
81   @Test
82   public void testExpectedNumberOfMessagesForWord()
83   {
84     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
85   }
86
87   @DisplayName("Await the expected final output messages")
88   @Test
89   public void testExpectedLastMessagesForWord()
90   {
91     TestData.assertExpectedLastMessagesForWord(receivedMessages);
92   }
93
94   @DisplayName("Assert the expected state in the state-store")
95   @Test
96   public void testExpectedState()
97   {
98     KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
99     TestData.assertExpectedState(store);
100   }
101
102   @AfterAll
103   public static void tearDown()
104   {
105     testDriver.close();
106   }
107
108
109   private static JsonSerializer serializer()
110   {
111     return new JsonSerializer().noTypeInfo();
112   }
113
114   private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
115   {
116     return deserializer(true);
117   }
118
119   private static JsonDeserializer<OutputWordCounter> valueDeserializer()
120   {
121     return deserializer(false);
122   }
123
124   private static <T> JsonDeserializer<T> deserializer(boolean isKey)
125   {
126     JsonDeserializer<T> deserializer = new JsonDeserializer<>();
127     deserializer.configure(
128         Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
129         isKey);
130     return deserializer;
131   }
132
133   private static String typeMappingsConfig()
134   {
135     return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);
136   }
137 }