1 package de.juplo.kafka.wordcount.counter;
3 import de.juplo.kafka.wordcount.splitter.TestInputUser;
4 import de.juplo.kafka.wordcount.splitter.TestInputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWord;
6 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
25 import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
26 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
30 public class CounterStreamProcessorTopologyTest
32 public static final String IN = "TEST-IN";
33 public static final String OUT = "TEST-OUT";
36 static TopologyTestDriver testDriver;
37 static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
41 public static void setUpTestDriver()
43 Topology topology = CounterStreamProcessor.buildTopology(
46 Stores.inMemoryKeyValueStore(STORE_NAME));
48 testDriver = new TopologyTestDriver(topology, serializationConfig());
50 TestInputTopic<TestInputUser, TestInputWord> in =
51 testDriver.createInputTopic(IN, serializer(), serializer());
52 TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
53 testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
57 .forEach(kv -> in.pipeInput(kv.key, kv.value));
59 receivedMessages = new LinkedMultiValueMap<>();
62 .forEach(record -> receivedMessages.add(record.key(), record.value()));
66 @DisplayName("Assert the expected output messages")
68 public void testExpectedMessages()
70 TestData.assertExpectedMessages(receivedMessages);
73 @DisplayName("Assert the expected number of messages")
75 public void testExpectedNumberOfMessagesForWord()
77 TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
80 @DisplayName("Await the expected final output messages")
82 public void testExpectedLastMessagesForWord()
84 TestData.assertExpectedLastMessagesForWord(receivedMessages);
87 @DisplayName("Assert the expected state in the state-store")
89 public void testExpectedState()
91 KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
92 TestData.assertExpectedState(store);
96 public static void tearDown()
102 private static JsonSerializer serializer()
104 return new JsonSerializer().noTypeInfo();
107 private static JsonDeserializer<TestOutputWord> keyDeserializer()
109 return deserializer(true);
112 private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
114 return deserializer(false);
117 private static <T> JsonDeserializer<T> deserializer(boolean isKey)
119 JsonDeserializer<T> deserializer = new JsonDeserializer<>();
120 deserializer.configure(
121 Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
126 private static String typeMappingsConfig()
128 return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);