8ecf9fafb003db060c1f773ce7649f7a39a8e09a
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.junit.jupiter.api.Test;
9 import org.springframework.kafka.support.serializer.JsonDeserializer;
10 import org.springframework.kafka.support.serializer.JsonSerde;
11 import org.springframework.kafka.support.serializer.JsonSerializer;
12 import org.springframework.util.LinkedMultiValueMap;
13 import org.springframework.util.MultiValueMap;
14
15 import java.util.Map;
16 import java.util.Properties;
17 import java.util.stream.Stream;
18
19 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
20 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
21 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
22 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
23
24
25 @Slf4j
26 public class Top10StreamProcessorTopologyTest
27 {
28   public final static String IN = "TEST-IN";
29   public final static String OUT = "TEST-OUT";
30
31   @Test
32   public void test()
33   {
34     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
35
36     Top10ApplicationConfiguration applicationConfiguriation =
37         new Top10ApplicationConfiguration();
38     Properties streamProcessorProperties =
39         applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
40     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
41
42     JsonSerde<?> keySerde = new JsonSerde<>();
43     keySerde.configure(propertyMap, true);
44     JsonSerde<?> valueSerde = new JsonSerde<>();
45     valueSerde.configure(propertyMap, false);
46
47     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
48
49     TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
50         IN,
51         (JsonSerializer<Key>)keySerde.serializer(),
52         (JsonSerializer<Entry>)valueSerde.serializer());
53
54     TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
55         OUT,
56         (JsonDeserializer<String>)keySerde.deserializer(),
57         (JsonDeserializer<Ranking>)valueSerde.deserializer());
58
59     Stream
60         .of(TestData.INPUT_MESSAGES)
61         .forEach(kv -> in.pipeInput(
62             Key.of(kv.key.getUser(), kv.key.getWord()),
63             Entry.of(kv.value.getWord(), kv.value.getCounter())));
64
65     MultiValueMap<String, Ranking> receivedMessages = new LinkedMultiValueMap<>();
66     out
67         .readRecordsToList()
68         .forEach(record ->
69         {
70           log.debug(
71               "OUT: {} -> {}, {}, {}",
72               record.key(),
73               record.value(),
74               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
75               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
76           receivedMessages.add(record.key(), record.value());
77         });
78
79     TestData.assertExpectedMessages(receivedMessages);
80
81     testDriver.close();
82   }
83 }