1 package de.juplo.kafka.wordcount.popular;
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
23 import java.time.Duration;
24 import java.time.ZoneId;
27 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
28 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
29 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
33 public class PopularStreamProcessorTopologyTest
35 public static final String IN = "TEST-IN";
36 public static final String OUT = "TEST-OUT";
37 public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
40 static TopologyTestDriver testDriver;
41 static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
45 public static void setUpTestDriver()
47 Topology topology = PopularStreamProcessor.buildTopology(
51 Stores.inMemoryWindowStore(
53 Duration.ofSeconds(60),
54 Duration.ofSeconds(30),
56 Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
58 testDriver = new TopologyTestDriver(topology, serializationConfig());
60 TestInputTopic<InputUser, InputWord> in =
61 testDriver.createInputTopic(IN, serializer(), serializer());
62 TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
63 testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
65 TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
67 receivedMessages = new LinkedMultiValueMap<>();
70 .forEach(record -> receivedMessages.add(record.key(), record.value()));
74 @DisplayName("Assert the expected output messages")
76 public void testExpectedMessages()
78 TestData.assertExpectedMessages(receivedMessages);
81 @DisplayName("Assert the expected number of messages")
83 public void testExpectedNumberOfMessagesForWord()
85 TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
88 @DisplayName("Await the expected final output messages")
90 public void testExpectedLastMessagesForWord()
92 TestData.assertExpectedLastMessagesForWord(receivedMessages);
95 @DisplayName("Assert the expected state in the state-store")
97 public void testExpectedState()
99 KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
100 TestData.assertExpectedState(store);
104 public static void tearDown()
110 private static JsonSerializer serializer()
112 return new JsonSerializer().noTypeInfo();
115 private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
117 return deserializer(true);
120 private static JsonDeserializer<OutputWordCounter> valueDeserializer()
122 return deserializer(false);
125 private static <T> JsonDeserializer<T> deserializer(boolean isKey)
127 JsonDeserializer<T> deserializer = new JsonDeserializer<>();
128 deserializer.configure(
129 Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
134 private static String typeMappingsConfig()
136 return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);