counter: 1.2.15 - `TestData` only holds and asserts the test-data
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.apache.kafka.streams.state.Stores;
9 import org.junit.jupiter.api.AfterEach;
10 import org.junit.jupiter.api.BeforeEach;
11 import org.junit.jupiter.api.Test;
12 import org.springframework.kafka.support.serializer.JsonDeserializer;
13 import org.springframework.kafka.support.serializer.JsonSerde;
14 import org.springframework.kafka.support.serializer.JsonSerializer;
15 import org.springframework.util.LinkedMultiValueMap;
16 import org.springframework.util.MultiValueMap;
17
18 import java.util.Map;
19 import java.util.Properties;
20 import java.util.stream.Stream;
21
22 import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
23 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
24 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
25 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
26
27
28 @Slf4j
29 public class CounterStreamProcessorTopologyTest
30 {
31   public final static String IN = "TEST-IN";
32   public final static String OUT = "TEST-OUT";
33
34
35   TopologyTestDriver testDriver;
36   TestInputTopic<String, Word> in;
37   TestOutputTopic<Word, WordCounter> out;
38
39
40   @BeforeEach
41   public void setUpTestDriver()
42   {
43     Topology topology = CounterStreamProcessor.buildTopology(
44         IN,
45         OUT,
46         Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
47
48     CounterApplicationConfiguriation applicationConfiguriation =
49         new CounterApplicationConfiguriation();
50     Properties streamProcessorProperties =
51         applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
52     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
53
54     JsonSerde<?> keySerde = new JsonSerde<>();
55     keySerde.configure(propertyMap, true);
56     JsonSerde<?> valueSerde = new JsonSerde<>();
57     valueSerde.configure(propertyMap, false);
58
59     testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
60
61     in = testDriver.createInputTopic(
62         IN,
63         (JsonSerializer<String>)keySerde.serializer(),
64         (JsonSerializer<Word>)valueSerde.serializer());
65
66     out = testDriver.createOutputTopic(
67         OUT,
68         (JsonDeserializer<Word>)keySerde.deserializer(),
69         (JsonDeserializer<WordCounter>)valueSerde.deserializer());
70   }
71
72
73   @Test
74   public void test()
75   {
76     Stream
77         .of(TestData.INPUT_MESSAGES)
78         .forEach(word -> in.pipeInput(word.getUser(), word));
79
80     MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
81     out
82         .readRecordsToList()
83         .forEach(record ->
84         {
85           log.debug(
86               "OUT: {} -> {}, {}, {}",
87               record.key(),
88               record.value(),
89               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
90               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
91           receivedMessages.add(record.key(), record.value());
92         });
93
94     TestData.assertExpectedMessages(receivedMessages);
95   }
96
97   @AfterEach
98   public void tearDown()
99   {
100     testDriver.close();
101   }
102 }