cd09c066ea852b6d6b1503147c535441f716ed63
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestRanking;
6 import de.juplo.kafka.wordcount.query.TestUser;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterEach;
15 import org.junit.jupiter.api.BeforeEach;
16 import org.junit.jupiter.api.Test;
17 import org.springframework.kafka.support.serializer.JsonDeserializer;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.util.LinkedMultiValueMap;
20 import org.springframework.util.MultiValueMap;
21
22 import java.util.Map;
23 import java.util.stream.Stream;
24
25 import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
26
27
28 @Slf4j
29 public class Top10StreamProcessorTopologyTest
30 {
31   public static final String IN = "TEST-IN";
32   public static final String OUT = "TEST-OUT";
33   public static final String STORE_NAME = "TOPOLOGY-TEST";
34
35
36   TopologyTestDriver testDriver;
37   TestInputTopic<TestWord, TestCounter> in;
38   TestOutputTopic<TestUser, TestRanking> out;
39
40
41   @BeforeEach
42   public void setUp()
43   {
44     Topology topology = Top10StreamProcessor.buildTopology(
45         IN,
46         OUT,
47         Stores.inMemoryKeyValueStore(STORE_NAME));
48
49     testDriver = new TopologyTestDriver(topology, serializationConfig());
50
51     in = testDriver.createInputTopic(
52         IN,
53         jsonSerializer(TestWord.class, true),
54         jsonSerializer(TestCounter.class,false));
55
56     out = testDriver.createOutputTopic(
57         OUT,
58         new JsonDeserializer()
59             .copyWithType(TestUser.class)
60             .ignoreTypeHeaders(),
61         new JsonDeserializer()
62             .copyWithType(TestRanking.class)
63             .ignoreTypeHeaders());
64
65   }
66
67
68   @Test
69   public void test()
70   {
71     Stream
72         .of(TestData.INPUT_MESSAGES)
73         .forEach(kv -> in.pipeInput(kv.key, kv.value));
74
75     MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
76     out
77         .readRecordsToList()
78         .forEach(record -> receivedMessages.add(record.key(), record.value()));
79
80     TestData.assertExpectedMessages(receivedMessages);
81
82     TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
83     TestData.assertExpectedLastMessagesForUsers(receivedMessages);
84
85     KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
86     TestData.assertExpectedState(store);
87   }
88
89   @AfterEach
90   public void tearDown()
91   {
92     testDriver.close();
93   }
94
95   private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
96   {
97     JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
98     jsonSerializer.configure(
99         Map.of(
100             JsonSerializer.TYPE_MAPPINGS,
101             "word:" + TestWord.class.getName() + "," +
102             "counter:" + TestCounter.class.getName()),
103         isKey);
104     return jsonSerializer;
105   }
106 }