top10: 1.2.1 - Removed logging of type-headers in tests
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.kafka.support.serializer.JsonDeserializer;
7 import org.springframework.kafka.support.serializer.JsonSerde;
8 import org.springframework.kafka.support.serializer.JsonSerializer;
9
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Properties;
13
14 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
15 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
16 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
17 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
18
19
20 @Slf4j
21 public class Top10StreamProcessorTopologyTest
22 {
23   public final static String IN = "TEST-IN";
24   public final static String OUT = "TEST-OUT";
25
26   @Test
27   public void test()
28   {
29     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
30
31     Top10ApplicationConfiguration applicationConfiguriation =
32         new Top10ApplicationConfiguration();
33     Properties streamProcessorProperties =
34         applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
35     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
36
37     JsonSerde<?> keySerde = new JsonSerde<>();
38     keySerde.configure(propertyMap, true);
39     JsonSerde<?> valueSerde = new JsonSerde<>();
40     valueSerde.configure(propertyMap, false);
41
42     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
43
44     TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
45         IN,
46         (JsonSerializer<Key>)keySerde.serializer(),
47         (JsonSerializer<Entry>)valueSerde.serializer());
48
49     TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
50         OUT,
51         (JsonDeserializer<String>)keySerde.deserializer(),
52         (JsonDeserializer<Ranking>)valueSerde.deserializer());
53
54     TestData.writeInputData((key, value) -> in.pipeInput(
55         key,
56         Entry.of(value.getWord(), value.getCounter())));
57
58     List<KeyValue<String, Ranking>> receivedMessages = out
59         .readRecordsToList()
60         .stream()
61         .map(record ->
62         {
63           log.debug(
64               "OUT: {} -> {}, {}, {}",
65               record.key(),
66               record.value(),
67               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
68               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
69           return KeyValue.pair(record.key(), record.value());
70         })
71         .toList();
72
73     TestData.assertExpectedResult(receivedMessages);
74   }
75 }