From 836368acb5e435a733df5893e477a406daeafcb3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 3 Jun 2024 18:01:55 +0200 Subject: [PATCH] splitter: 1.2.0 - Added `SplitterStreamProcessorTopologyTest` --- .../SplitterStreamProcessorTopologyTest.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java new file mode 100644 index 0000000..5e2d729 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java @@ -0,0 +1,73 @@ +package de.juplo.kafka.wordcount.splitter; + +import de.juplo.kafka.wordcount.counter.TestOutputUser; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig; + + +@Slf4j +public class SplitterStreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUp() + { + testDriver = new TopologyTestDriver( + SplitterStreamProcessor.buildTopology(IN, OUT), + serializationConfig()); + + in = testDriver.createInputTopic( + IN, + new JsonSerializer().noTypeInfo(), + new JsonSerializer().noTypeInfo()); + + out = testDriver.createOutputTopic( + OUT, + new JsonDeserializer().copyWithType(TestOutputUser.class), + new JsonDeserializer().copyWithType(TestOutputWord.class)); + } + + + @Test + public void test() + { + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); + + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> receivedMessages.add(record.key(), record.value())); + + TestData.assertExpectedMessages(receivedMessages); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } +} -- 2.20.1