--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
+import de.juplo.kafka.wordcount.counter.TestOutputWord;
+import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class SplitterStreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+
+
+ TopologyTestDriver testDriver;
+ TestInputTopic<TestInputUser, TestInputRecording> in;
+ TestOutputTopic<TestOutputUser, TestOutputWord> out;
+
+
+ @BeforeEach
+ public void setUp()
+ {
+ testDriver = new TopologyTestDriver(
+ SplitterStreamProcessor.buildTopology(IN, OUT),
+ serializationConfig());
+
+ in = testDriver.createInputTopic(
+ IN,
+ new JsonSerializer().noTypeInfo(),
+ new JsonSerializer().noTypeInfo());
+
+ out = testDriver.createOutputTopic(
+ OUT,
+ new JsonDeserializer().copyWithType(TestOutputUser.class),
+ new JsonDeserializer().copyWithType(TestOutputWord.class));
+ }
+
+
+ @Test
+ public void test()
+ {
+ TestData
+ .getInputMessages()
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+ MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages = new LinkedMultiValueMap<>();
+ out
+ .readRecordsToList()
+ .forEach(record -> receivedMessages.add(record.key(), record.value()));
+
+ TestData.assertExpectedMessages(receivedMessages);
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ testDriver.close();
+ }
+}