top10: 1.1.2 - (RED) Added IT for the expectated processing
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.state.Stores;
6 import org.junit.jupiter.api.Test;
7 import org.springframework.kafka.support.serializer.JsonDeserializer;
8 import org.springframework.kafka.support.serializer.JsonSerde;
9 import org.springframework.kafka.support.serializer.JsonSerializer;
10
11 import java.time.Instant;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.Properties;
15 import java.util.function.BiConsumer;
16
17 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
18 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
19 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
20 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
21
22
23 @Slf4j
24 public class Top10StreamProcessorTopologyTest
25 {
26   public final static String IN = "TEST-IN";
27   public final static String OUT = "TEST-OUT";
28
29   @Test
30   public void test()
31   {
32     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
33
34     Top10ApplicationConfiguration applicationConfiguriation =
35         new Top10ApplicationConfiguration();
36     Properties streamProcessorProperties =
37         applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
38     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
39
40     JsonSerde<?> keySerde = new JsonSerde<>();
41     keySerde.configure(propertyMap, true);
42     JsonSerde<?> valueSerde = new JsonSerde<>();
43     valueSerde.configure(propertyMap, false);
44
45     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
46
47     TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
48         IN,
49         (JsonSerializer<Key>)keySerde.serializer(),
50         (JsonSerializer<Entry>)valueSerde.serializer());
51
52     TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
53         OUT,
54         (JsonDeserializer<String>)keySerde.deserializer(),
55         (JsonDeserializer<Ranking>)valueSerde.deserializer());
56
57     TestData.writeInputData(new BiConsumer<Key, Counter>()
58     {
59       private Instant timestamp = Instant.now();
60
61       @Override
62       public void accept(Key key, Counter value)
63       {
64         in.pipeInput(
65             key,
66             Entry.of(value.getWord(), value.getCounter()),
67             timestamp);
68         timestamp = timestamp.plusMillis(500);
69       }
70     });
71
72     List<KeyValue<String, Ranking>> receivedMessages = out
73         .readRecordsToList()
74         .stream()
75         .map(record ->
76         {
77           log.debug(
78               "OUT: {} -> {}, {}, {}",
79               record.key(),
80               record.value(),
81               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
82               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
83           return KeyValue.pair(record.key(), record.value());
84         })
85         .toList();
86
87     TestData.assertExpectedResult(receivedMessages);
88
89     testDriver.close();
90   }
91 }