top10: 1.4.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestRanking;
6 import de.juplo.kafka.wordcount.query.TestUser;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterEach;
15 import org.junit.jupiter.api.BeforeEach;
16 import org.junit.jupiter.api.Test;
17 import org.springframework.kafka.support.serializer.JsonDeserializer;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.util.LinkedMultiValueMap;
20 import org.springframework.util.MultiValueMap;
21
22 import java.util.Map;
23
24 import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
25
26
27 @Slf4j
28 public class Top10StreamProcessorTopologyTest
29 {
30   public static final String IN = "TEST-IN";
31   public static final String OUT = "TEST-OUT";
32   public static final String STORE_NAME = "TOPOLOGY-TEST";
33
34
35   TopologyTestDriver testDriver;
36   TestInputTopic<TestWord, TestCounter> in;
37   TestOutputTopic<TestUser, TestRanking> out;
38
39
40   @BeforeEach
41   public void setUp()
42   {
43     Topology topology = Top10StreamProcessor.buildTopology(
44         IN,
45         OUT,
46         Stores.inMemoryKeyValueStore(STORE_NAME));
47
48     testDriver = new TopologyTestDriver(topology, serializationConfig());
49
50     in = testDriver.createInputTopic(
51         IN,
52         jsonSerializer(TestWord.class, true),
53         jsonSerializer(TestCounter.class,false));
54
55     out = testDriver.createOutputTopic(
56         OUT,
57         new JsonDeserializer()
58             .copyWithType(TestUser.class)
59             .ignoreTypeHeaders(),
60         new JsonDeserializer()
61             .copyWithType(TestRanking.class)
62             .ignoreTypeHeaders());
63
64   }
65
66
67   @Test
68   public void test()
69   {
70     TestData
71         .getInputMessages()
72         .forEach(kv -> in.pipeInput(kv.key, kv.value));
73
74     MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
75     out
76         .readRecordsToList()
77         .forEach(record -> receivedMessages.add(record.key(), record.value()));
78
79     TestData.assertExpectedMessages(receivedMessages);
80
81     TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
82     TestData.assertExpectedLastMessagesForUsers(receivedMessages);
83
84     KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
85     TestData.assertExpectedState(store);
86   }
87
88   @AfterEach
89   public void tearDown()
90   {
91     testDriver.close();
92   }
93
94   private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
95   {
96     JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
97     jsonSerializer.configure(
98         Map.of(
99             JsonSerializer.TYPE_MAPPINGS,
100             "word:" + TestWord.class.getName() + "," +
101             "counter:" + TestCounter.class.getName()),
102         isKey);
103     return jsonSerializer;
104   }
105 }