query: 1.0.6 - Added `QueryStreamProcessorTopologyTest`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / QueryStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import de.juplo.kafka.wordcount.top10.TestRanking;
5 import de.juplo.kafka.wordcount.users.TestUserData;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.common.serialization.StringSerializer;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.Topology;
10 import org.apache.kafka.streams.TopologyTestDriver;
11 import org.apache.kafka.streams.state.KeyValueStore;
12 import org.apache.kafka.streams.state.Stores;
13 import org.junit.jupiter.api.AfterEach;
14 import org.junit.jupiter.api.BeforeEach;
15 import org.junit.jupiter.api.Test;
16 import org.springframework.kafka.support.serializer.JsonSerializer;
17
18 import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
19
20
21 @Slf4j
22 public class QueryStreamProcessorTopologyTest
23 {
24   public static final String TOP10_IN = "TOP10-IN";
25   public static final String USERS_IN = "USERS-IN";
26   public static final String STORE_NAME = "TOPOLOGY-TEST";
27
28
29   TopologyTestDriver testDriver;
30   TestInputTopic<String, TestRanking> top10In;
31   TestInputTopic<String, TestUserData> userIn;
32
33
34   @BeforeEach
35   public void setUp()
36   {
37     Topology topology = QueryStreamProcessor.buildTopology(
38         USERS_IN,
39         TOP10_IN,
40         Stores.inMemoryKeyValueStore(STORE_NAME),
41         new ObjectMapper());
42
43     testDriver = new TopologyTestDriver(topology, serializationConfig());
44
45     top10In = testDriver.createInputTopic(
46         TOP10_IN,
47         new StringSerializer(),
48         new JsonSerializer());
49
50     userIn = testDriver.createInputTopic(
51         USERS_IN,
52         new StringSerializer(),
53         new JsonSerializer());
54   }
55
56
57   @Test
58   public void test()
59   {
60     TestData
61         .getUsersMessages()
62         .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
63     TestData
64         .getTop10Messages()
65         .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
66
67     KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
68     TestData.assertExpectedState(store);
69   }
70
71   @AfterEach
72   public void tearDown()
73   {
74     testDriver.close();
75   }
76 }