1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestRanking;
6 import de.juplo.kafka.wordcount.query.TestStats;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
25 import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
26 import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
30 public class Top10StreamProcessorTopologyTest
32 public static final String IN = "TEST-IN";
33 public static final String OUT = "TEST-OUT";
35 static TopologyTestDriver testDriver;
36 static MultiValueMap<TestStats, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
40 public static void setUp()
42 Topology topology = Top10StreamProcessor.buildTopology(
45 Stores.inMemoryKeyValueStore(STORE_NAME));
47 testDriver = new TopologyTestDriver(topology, serializationConfig());
49 TestInputTopic<TestWord, TestCounter> in = testDriver.createInputTopic(
51 jsonSerializer(TestWord.class, true),
52 jsonSerializer(TestCounter.class,false));
54 TestOutputTopic<TestStats, TestRanking> out = testDriver.createOutputTopic(
56 new JsonDeserializer()
57 .copyWithType(TestStats.class)
59 new JsonDeserializer()
60 .copyWithType(TestRanking.class)
61 .ignoreTypeHeaders());
65 .forEach(kv -> in.pipeInput(kv.key, kv.value));
69 .forEach(record -> receivedMessages.add(record.key(), record.value()));
73 @DisplayName("Assert the expected output messages")
75 public void testExpectedMessages()
77 TestData.assertExpectedMessages(receivedMessages);
80 @DisplayName("Assert the expected number of messages")
82 public void testExpectedNumberOfMessages()
84 TestData.assertExpectedNumberOfMessages(receivedMessages);
87 @DisplayName("Assert the expected final output messages")
89 public void testExpectedLastMessages()
91 TestData.assertExpectedLastMessages(receivedMessages);
94 @DisplayName("Assert the expected state in the state-store")
96 public void testExpectedState()
98 KeyValueStore<Stats, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
99 TestData.assertExpectedState(store);
103 public static void tearDown()
108 private static <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
110 JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
111 jsonSerializer.configure(
113 JsonSerializer.TYPE_MAPPINGS,
114 "key:" + TestWord.class.getName() + "," +
115 "counter:" + TestCounter.class.getName()),
117 return jsonSerializer;