X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessorTopologyTest.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessorTopologyTest.java;h=5e2d7299f496b62cda9d568ae37a9b9422fb639e;hb=836368acb5e435a733df5893e477a406daeafcb3;hp=0000000000000000000000000000000000000000;hpb=82a2e30072861bc8a3c19d51ebca158a3331b5d9;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java new file mode 100644 index 0000000..5e2d729 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java @@ -0,0 +1,73 @@ +package de.juplo.kafka.wordcount.splitter; + +import de.juplo.kafka.wordcount.counter.TestOutputUser; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig; + + +@Slf4j +public class SplitterStreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUp() + { + testDriver = new TopologyTestDriver( + SplitterStreamProcessor.buildTopology(IN, OUT), + serializationConfig()); + + in = testDriver.createInputTopic( + IN, + new JsonSerializer().noTypeInfo(), + new JsonSerializer().noTypeInfo()); + + out = testDriver.createOutputTopic( + OUT, + new JsonDeserializer().copyWithType(TestOutputUser.class), + new JsonDeserializer().copyWithType(TestOutputWord.class)); + } + + + @Test + public void test() + { + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); + + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> receivedMessages.add(record.key(), record.value())); + + TestData.assertExpectedMessages(receivedMessages); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } +}