splitter: 1.2.0 - Added `SplitterStreamProcessorTopologyTest` splitter-1.2.0
authorKai Moritz <kai@juplo.de>
Mon, 3 Jun 2024 16:01:55 +0000 (18:01 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 4 Jun 2024 21:31:36 +0000 (23:31 +0200)
src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java [new file with mode: 0644]

diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..5e2d729
--- /dev/null
@@ -0,0 +1,73 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
+import de.juplo.kafka.wordcount.counter.TestOutputWord;
+import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class SplitterStreamProcessorTopologyTest
+{
+  public final static String IN = "TEST-IN";
+  public final static String OUT = "TEST-OUT";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<TestInputUser, TestInputRecording> in;
+  TestOutputTopic<TestOutputUser, TestOutputWord> out;
+
+
+  @BeforeEach
+  public void setUp()
+  {
+    testDriver = new TopologyTestDriver(
+        SplitterStreamProcessor.buildTopology(IN, OUT),
+        serializationConfig());
+
+    in = testDriver.createInputTopic(
+        IN,
+        new JsonSerializer().noTypeInfo(),
+        new JsonSerializer().noTypeInfo());
+
+    out = testDriver.createOutputTopic(
+        OUT,
+        new JsonDeserializer().copyWithType(TestOutputUser.class),
+        new JsonDeserializer().copyWithType(TestOutputWord.class));
+  }
+
+
+  @Test
+  public void test()
+  {
+    TestData
+        .getInputMessages()
+        .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+    MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages = new LinkedMultiValueMap<>();
+    out
+        .readRecordsToList()
+        .forEach(record -> receivedMessages.add(record.key(), record.value()));
+
+    TestData.assertExpectedMessages(receivedMessages);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+}