WIP
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.state.Stores;
6 import org.junit.jupiter.api.Test;
7 import org.springframework.kafka.support.serializer.JsonDeserializer;
8 import org.springframework.kafka.support.serializer.JsonSerde;
9 import org.springframework.kafka.support.serializer.JsonSerializer;
10
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Properties;
14
15 import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
16 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
17 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
18 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
19
20
21 @Slf4j
22 public class Top10StreamProcessorTopologyTest
23 {
24   public final static String IN = "TEST-IN";
25   public final static String OUT = "TEST-OUT";
26
27   @Test
28   public void test()
29   {
30     Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
31
32     Top10ApplicationConfiguriation applicationConfiguriation =
33         new Top10ApplicationConfiguriation();
34     Properties streamProcessorProperties =
35         applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
36     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
37
38     JsonSerde<?> keySerde = new JsonSerde<>();
39     keySerde.configure(propertyMap, true);
40     JsonSerde<?> valueSerde = new JsonSerde<>();
41     valueSerde.configure(propertyMap, false);
42
43     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
44
45     TestInputTopic<String, Word> in = testDriver.createInputTopic(
46         IN,
47         (JsonSerializer<String>)keySerde.serializer(),
48         (JsonSerializer<Word>)valueSerde.serializer());
49
50     TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
51         OUT,
52         (JsonDeserializer<Word>)keySerde.deserializer(),
53         (JsonDeserializer<WordCounter>)valueSerde.deserializer());
54
55     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
56
57     List<KeyValue<Word, WordCounter>> receivedMessages = out
58         .readRecordsToList()
59         .stream()
60         .map(record ->
61         {
62           log.debug(
63               "OUT: {} -> {}, {}, {}",
64               record.key(),
65               record.value(),
66               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
67               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
68           return KeyValue.pair(record.key(), record.value());
69         })
70         .toList();
71
72     TestData.assertExpectedResult(receivedMessages);
73   }
74 }