1 package de.juplo.kafka.wordcount.top10;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.junit.jupiter.api.Test;
9 import org.springframework.kafka.support.serializer.JsonDeserializer;
10 import org.springframework.kafka.support.serializer.JsonSerde;
11 import org.springframework.kafka.support.serializer.JsonSerializer;
12 import org.springframework.util.LinkedMultiValueMap;
13 import org.springframework.util.MultiValueMap;
16 import java.util.Properties;
17 import java.util.stream.Stream;
19 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
20 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
21 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
22 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
26 public class Top10StreamProcessorTopologyTest
28 public final static String IN = "TEST-IN";
29 public final static String OUT = "TEST-OUT";
34 Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
36 Top10ApplicationConfiguration applicationConfiguriation =
37 new Top10ApplicationConfiguration();
38 Properties streamProcessorProperties =
39 applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
40 Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
42 JsonSerde<?> keySerde = new JsonSerde<>();
43 keySerde.configure(propertyMap, true);
44 JsonSerde<?> valueSerde = new JsonSerde<>();
45 valueSerde.configure(propertyMap, false);
47 TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
49 TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
51 (JsonSerializer<Key>)keySerde.serializer(),
52 (JsonSerializer<Entry>)valueSerde.serializer());
54 TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
56 (JsonDeserializer<String>)keySerde.deserializer(),
57 (JsonDeserializer<Ranking>)valueSerde.deserializer());
60 .of(TestData.INPUT_MESSAGES)
61 .forEach(kv -> in.pipeInput(
62 Key.of(kv.key.getUser(), kv.key.getWord()),
63 Entry.of(kv.value.getWord(), kv.value.getCounter())));
65 MultiValueMap<String, Ranking> receivedMessages = new LinkedMultiValueMap<>();
71 "OUT: {} -> {}, {}, {}",
74 parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
75 parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
76 receivedMessages.add(record.key(), record.value());
79 TestData.assertExpectedMessages(receivedMessages);