top10: 1.1.2 - (RED) Added test, that asserts the expectated processing
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10StreamProcessorTopologyTest.java
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..8ecf9fa
--- /dev/null
@@ -0,0 +1,83 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+
+
+@Slf4j
+public class Top10StreamProcessorTopologyTest
+{
+  public final static String IN = "TEST-IN";
+  public final static String OUT = "TEST-OUT";
+
+  @Test
+  public void test()
+  {
+    Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
+
+    Top10ApplicationConfiguration applicationConfiguriation =
+        new Top10ApplicationConfiguration();
+    Properties streamProcessorProperties =
+        applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
+    Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+    JsonSerde<?> keySerde = new JsonSerde<>();
+    keySerde.configure(propertyMap, true);
+    JsonSerde<?> valueSerde = new JsonSerde<>();
+    valueSerde.configure(propertyMap, false);
+
+    TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+
+    TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
+        IN,
+        (JsonSerializer<Key>)keySerde.serializer(),
+        (JsonSerializer<Entry>)valueSerde.serializer());
+
+    TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
+        OUT,
+        (JsonDeserializer<String>)keySerde.deserializer(),
+        (JsonDeserializer<Ranking>)valueSerde.deserializer());
+
+    Stream
+        .of(TestData.INPUT_MESSAGES)
+        .forEach(kv -> in.pipeInput(
+            Key.of(kv.key.getUser(), kv.key.getWord()),
+            Entry.of(kv.value.getWord(), kv.value.getCounter())));
+
+    MultiValueMap<String, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+    out
+        .readRecordsToList()
+        .forEach(record ->
+        {
+          log.debug(
+              "OUT: {} -> {}, {}, {}",
+              record.key(),
+              record.value(),
+              parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+              parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+          receivedMessages.add(record.key(), record.value());
+        });
+
+    TestData.assertExpectedMessages(receivedMessages);
+
+    testDriver.close();
+  }
+}