WIP
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.kafka.support.serializer.JsonDeserializer;
7 import org.springframework.kafka.support.serializer.JsonSerde;
8 import org.springframework.kafka.support.serializer.JsonSerializer;
9
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Properties;
13
14 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
15 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
16 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
17 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
18
19
20 @Slf4j
21 public class Top10StreamProcessorTopologyTest
22 {
23   public final static String IN = "TEST-IN";
24   public final static String OUT = "TEST-OUT";
25
26   @Test
27   public void test()
28   {
29     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
30
31     Top10ApplicationConfiguration applicationConfiguriation =
32         new Top10ApplicationConfiguration();
33     Properties streamProcessorProperties =
34         applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
35     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
36
37     JsonSerde<?> keySerde = new JsonSerde<>();
38     keySerde.configure(propertyMap, true);
39     JsonSerde<?> valueSerde = new JsonSerde<>();
40     valueSerde.configure(propertyMap, false);
41
42     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
43
44     TestInputTopic<Key, Counter> in = testDriver.createInputTopic(
45         IN,
46         (JsonSerializer<Key>)keySerde.serializer(),
47         (JsonSerializer<Counter>)valueSerde.serializer());
48
49     TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
50         OUT,
51         (JsonDeserializer<String>)keySerde.deserializer(),
52         (JsonDeserializer<Ranking>)valueSerde.deserializer());
53
54     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
55
56     List<KeyValue<String, Ranking>> receivedMessages = out
57         .readRecordsToList()
58         .stream()
59         .map(record ->
60         {
61           log.debug(
62               "OUT: {} -> {}, {}, {}",
63               record.key(),
64               record.value(),
65               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
66               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
67           return KeyValue.pair(record.key(), record.value());
68         })
69         .toList();
70
71     TestData.assertExpectedResult(receivedMessages);
72   }
73 }