1 package de.juplo.kafka.wordcount.top10;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.junit.jupiter.api.AfterEach;
9 import org.junit.jupiter.api.BeforeEach;
10 import org.junit.jupiter.api.Test;
11 import org.springframework.kafka.support.serializer.JsonDeserializer;
12 import org.springframework.kafka.support.serializer.JsonSerde;
13 import org.springframework.kafka.support.serializer.JsonSerializer;
14 import org.springframework.util.LinkedMultiValueMap;
15 import org.springframework.util.MultiValueMap;
18 import java.util.Properties;
19 import java.util.stream.Stream;
21 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
22 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
23 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
24 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
28 public class Top10StreamProcessorTopologyTest
30 public final static String IN = "TEST-IN";
31 public final static String OUT = "TEST-OUT";
34 TopologyTestDriver testDriver;
35 TestInputTopic<Key, Entry> in;
36 TestOutputTopic<User, Ranking> out;
42 Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
44 Top10ApplicationConfiguration applicationConfiguriation =
45 new Top10ApplicationConfiguration();
46 Properties streamProcessorProperties =
47 applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
48 Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
50 JsonSerde<?> keySerde = new JsonSerde<>();
51 keySerde.configure(propertyMap, true);
52 JsonSerde<?> valueSerde = new JsonSerde<>();
53 valueSerde.configure(propertyMap, false);
55 testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
57 in = testDriver.createInputTopic(
59 (JsonSerializer<Key>)keySerde.serializer(),
60 (JsonSerializer<Entry>)valueSerde.serializer());
62 out = testDriver.createOutputTopic(
64 (JsonDeserializer<User>)keySerde.deserializer(),
65 (JsonDeserializer<Ranking>)valueSerde.deserializer());
74 .of(TestData.INPUT_MESSAGES)
75 .forEach(kv -> in.pipeInput(
76 Key.of(kv.key.getUser(), kv.key.getWord()),
77 Entry.of(kv.value.getWord(), kv.value.getCounter())));
79 MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
85 "OUT: {} -> {}, {}, {}",
88 parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
89 parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
90 receivedMessages.add(record.key(), record.value());
93 TestData.assertExpectedMessages(receivedMessages);
97 public void tearDown()