--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+
+
+@Slf4j
+public class Top10StreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+
+ @Test
+ public void test()
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
+
+ Top10ApplicationConfiguration applicationConfiguriation =
+ new Top10ApplicationConfiguration();
+ Properties streamProcessorProperties =
+ applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ JsonSerde<?> keySerde = new JsonSerde<>();
+ keySerde.configure(propertyMap, true);
+ JsonSerde<?> valueSerde = new JsonSerde<>();
+ valueSerde.configure(propertyMap, false);
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+
+ TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
+ IN,
+ (JsonSerializer<Key>)keySerde.serializer(),
+ (JsonSerializer<Entry>)valueSerde.serializer());
+
+ TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
+ OUT,
+ (JsonDeserializer<String>)keySerde.deserializer(),
+ (JsonDeserializer<Ranking>)valueSerde.deserializer());
+
+ Stream
+ .of(TestData.INPUT_MESSAGES)
+ .forEach(kv -> in.pipeInput(
+ Key.of(kv.key.getUser(), kv.key.getWord()),
+ Entry.of(kv.value.getWord(), kv.value.getCounter())));
+
+ MultiValueMap<String, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+ out
+ .readRecordsToList()
+ .forEach(record ->
+ {
+ log.debug(
+ "OUT: {} -> {}, {}, {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+ receivedMessages.add(record.key(), record.value());
+ });
+
+ TestData.assertExpectedMessages(receivedMessages);
+
+ testDriver.close();
+ }
+}