top10: 1.3.0 - Refined input JSON to match the new general stats-format
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestRanking;
6 import de.juplo.kafka.wordcount.query.TestUser;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
22
23 import java.util.Map;
24
25 import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
26 import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
27
28
29 @Slf4j
30 public class Top10StreamProcessorTopologyTest
31 {
32   public static final String IN = "TEST-IN";
33   public static final String OUT = "TEST-OUT";
34
35   static TopologyTestDriver testDriver;
36   static MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
37
38
39   @BeforeAll
40   public static void setUp()
41   {
42     Topology topology = Top10StreamProcessor.buildTopology(
43         IN,
44         OUT,
45         Stores.inMemoryKeyValueStore(STORE_NAME));
46
47     testDriver = new TopologyTestDriver(topology, serializationConfig());
48
49     TestInputTopic<TestWord, TestCounter> in = testDriver.createInputTopic(
50         IN,
51         jsonSerializer(TestWord.class, true),
52         jsonSerializer(TestCounter.class,false));
53
54     TestOutputTopic<TestUser, TestRanking> out = testDriver.createOutputTopic(
55         OUT,
56         new JsonDeserializer()
57             .copyWithType(TestUser.class)
58             .ignoreTypeHeaders(),
59         new JsonDeserializer()
60             .copyWithType(TestRanking.class)
61             .ignoreTypeHeaders());
62
63     TestData
64         .getInputMessages()
65         .forEach(kv -> in.pipeInput(kv.key, kv.value));
66
67     out
68         .readRecordsToList()
69         .forEach(record -> receivedMessages.add(record.key(), record.value()));
70   }
71
72
73   @DisplayName("Assert the expected output messages")
74   @Test
75   public void testExpectedMessages()
76   {
77     TestData.assertExpectedMessages(receivedMessages);
78   }
79
80   @DisplayName("Assert the expected number of messages")
81   @Test
82   public void testExpectedNumberOfMessagesForUsers()
83   {
84     TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
85   }
86
87   @DisplayName("Assert the expected final output messages")
88   @Test
89   public void testExpectedLastMessagesForUSers()
90   {
91     TestData.assertExpectedLastMessagesForUsers(receivedMessages);
92   }
93
94   @DisplayName("Assert the expected state in the state-store")
95   @Test
96   public void testExpectedState()
97   {
98     KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
99     TestData.assertExpectedState(store);
100   }
101
102   @AfterAll
103   public static void tearDown()
104   {
105     testDriver.close();
106   }
107
108   private static <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
109   {
110     JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
111     jsonSerializer.configure(
112         Map.of(
113             JsonSerializer.TYPE_MAPPINGS,
114             "key:" + TestWord.class.getName() + "," +
115             "counter:" + TestCounter.class.getName()),
116         isKey);
117     return jsonSerializer;
118   }
119 }