1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestRanking;
6 import de.juplo.kafka.wordcount.query.TestUser;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterEach;
15 import org.junit.jupiter.api.BeforeEach;
16 import org.junit.jupiter.api.Test;
17 import org.springframework.kafka.support.serializer.JsonDeserializer;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.util.LinkedMultiValueMap;
20 import org.springframework.util.MultiValueMap;
24 import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
28 public class Top10StreamProcessorTopologyTest
30 public static final String IN = "TEST-IN";
31 public static final String OUT = "TEST-OUT";
32 public static final String STORE_NAME = "TOPOLOGY-TEST";
35 TopologyTestDriver testDriver;
36 TestInputTopic<TestWord, TestCounter> in;
37 TestOutputTopic<TestUser, TestRanking> out;
43 Topology topology = Top10StreamProcessor.buildTopology(
46 Stores.inMemoryKeyValueStore(STORE_NAME));
48 testDriver = new TopologyTestDriver(topology, serializationConfig());
50 in = testDriver.createInputTopic(
52 jsonSerializer(TestWord.class, true),
53 jsonSerializer(TestCounter.class,false));
55 out = testDriver.createOutputTopic(
57 new JsonDeserializer()
58 .copyWithType(TestUser.class)
60 new JsonDeserializer()
61 .copyWithType(TestRanking.class)
62 .ignoreTypeHeaders());
72 .forEach(kv -> in.pipeInput(kv.key, kv.value));
74 MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
77 .forEach(record -> receivedMessages.add(record.key(), record.value()));
79 TestData.assertExpectedMessages(receivedMessages);
81 TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
82 TestData.assertExpectedLastMessagesForUsers(receivedMessages);
84 KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
85 TestData.assertExpectedState(store);
89 public void tearDown()
94 private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
96 JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
97 jsonSerializer.configure(
99 JsonSerializer.TYPE_MAPPINGS,
100 "word:" + TestWord.class.getName() + "," +
101 "counter:" + TestCounter.class.getName()),
103 return jsonSerializer;