1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.streams.TestInputTopic;
7 import org.apache.kafka.streams.TestOutputTopic;
8 import org.apache.kafka.streams.Topology;
9 import org.apache.kafka.streams.TopologyTestDriver;
10 import org.apache.kafka.streams.state.KeyValueStore;
11 import org.apache.kafka.streams.state.Stores;
12 import org.junit.jupiter.api.AfterEach;
13 import org.junit.jupiter.api.BeforeEach;
14 import org.junit.jupiter.api.Test;
15 import org.springframework.kafka.support.serializer.JsonDeserializer;
16 import org.springframework.kafka.support.serializer.JsonSerializer;
17 import org.springframework.util.LinkedMultiValueMap;
18 import org.springframework.util.MultiValueMap;
21 import java.util.stream.Stream;
23 import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
27 public class Top10StreamProcessorTopologyTest
29 public static final String IN = "TEST-IN";
30 public static final String OUT = "TEST-OUT";
31 public static final String STORE_NAME = "TOPOLOGY-TEST";
34 TopologyTestDriver testDriver;
35 TestInputTopic<Key, Entry> in;
36 TestOutputTopic<User, Ranking> out;
42 Topology topology = Top10StreamProcessor.buildTopology(
45 Stores.inMemoryKeyValueStore(STORE_NAME));
47 testDriver = new TopologyTestDriver(topology, serializationConfig());
49 in = testDriver.createInputTopic(
51 jsonSerializer(Key.class, true),
52 jsonSerializer(Entry.class,false));
54 out = testDriver.createOutputTopic(
56 new JsonDeserializer()
57 .copyWithType(User.class)
59 new JsonDeserializer()
60 .copyWithType(Ranking.class)
61 .ignoreTypeHeaders());
70 .of(TestData.INPUT_MESSAGES)
71 .forEach(kv -> in.pipeInput(
72 Key.of(kv.key.getUser(), kv.key.getWord()),
73 Entry.of(kv.value.getWord(), kv.value.getCounter())));
75 MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
78 .forEach(record -> receivedMessages.add(record.key(), record.value()));
80 TestData.assertExpectedMessages(receivedMessages);
82 TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
83 TestData.assertExpectedLastMessagesForUsers(receivedMessages);
85 KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
86 TestData.assertExpectedState(store);
90 public void tearDown()
95 private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
97 JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
98 jsonSerializer.configure(
100 JsonSerializer.TYPE_MAPPINGS,
101 "word:" + TestWord.class.getName() + "," +
102 "counter:" + TestCounter.class.getName()),
104 return jsonSerializer;