01c1cf637351139669177914bbb77d3ae85d9fc7
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.junit.jupiter.api.AfterEach;
9 import org.junit.jupiter.api.BeforeEach;
10 import org.junit.jupiter.api.Test;
11 import org.springframework.kafka.support.serializer.JsonDeserializer;
12 import org.springframework.kafka.support.serializer.JsonSerde;
13 import org.springframework.kafka.support.serializer.JsonSerializer;
14 import org.springframework.util.LinkedMultiValueMap;
15 import org.springframework.util.MultiValueMap;
16
17 import java.util.Map;
18 import java.util.Properties;
19 import java.util.stream.Stream;
20
21 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
22 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
23 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
24 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
25
26
27 @Slf4j
28 public class Top10StreamProcessorTopologyTest
29 {
30   public final static String IN = "TEST-IN";
31   public final static String OUT = "TEST-OUT";
32
33
34   TopologyTestDriver testDriver;
35   TestInputTopic<Key, Entry> in;
36   TestOutputTopic<User, Ranking> out;
37
38
39   @BeforeEach
40   public void setUp()
41   {
42     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
43
44     Top10ApplicationConfiguration applicationConfiguriation =
45         new Top10ApplicationConfiguration();
46     Properties streamProcessorProperties =
47         applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
48     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
49
50     JsonSerde<?> keySerde = new JsonSerde<>();
51     keySerde.configure(propertyMap, true);
52     JsonSerde<?> valueSerde = new JsonSerde<>();
53     valueSerde.configure(propertyMap, false);
54
55     testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
56
57     in = testDriver.createInputTopic(
58         IN,
59         (JsonSerializer<Key>)keySerde.serializer(),
60         (JsonSerializer<Entry>)valueSerde.serializer());
61
62     out = testDriver.createOutputTopic(
63         OUT,
64         (JsonDeserializer<User>)keySerde.deserializer(),
65         (JsonDeserializer<Ranking>)valueSerde.deserializer());
66
67   }
68
69
70   @Test
71   public void test()
72   {
73     Stream
74         .of(TestData.INPUT_MESSAGES)
75         .forEach(kv -> in.pipeInput(
76             Key.of(kv.key.getUser(), kv.key.getWord()),
77             Entry.of(kv.value.getWord(), kv.value.getCounter())));
78
79     MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
80     out
81         .readRecordsToList()
82         .forEach(record ->
83         {
84           log.debug(
85               "OUT: {} -> {}, {}, {}",
86               record.key(),
87               record.value(),
88               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
89               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
90           receivedMessages.add(record.key(), record.value());
91         });
92
93     TestData.assertExpectedMessages(receivedMessages);
94   }
95
96   @AfterEach
97   public void tearDown()
98   {
99     testDriver.close();
100   }
101 }