1 package de.juplo.kafka.wordcount.splitter;
3 import de.juplo.kafka.wordcount.counter.TestOutputUser;
4 import de.juplo.kafka.wordcount.counter.TestOutputWord;
5 import de.juplo.kafka.wordcount.recorder.TestInputRecording;
6 import de.juplo.kafka.wordcount.recorder.TestInputUser;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.TopologyTestDriver;
11 import org.junit.jupiter.api.AfterEach;
12 import org.junit.jupiter.api.BeforeEach;
13 import org.junit.jupiter.api.Test;
14 import org.springframework.kafka.support.serializer.JsonDeserializer;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
16 import org.springframework.util.LinkedMultiValueMap;
17 import org.springframework.util.MultiValueMap;
19 import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig;
23 public class SplitterStreamProcessorTopologyTest
25 public final static String IN = "TEST-IN";
26 public final static String OUT = "TEST-OUT";
29 TopologyTestDriver testDriver;
30 TestInputTopic<TestInputUser, TestInputRecording> in;
31 TestOutputTopic<TestOutputUser, TestOutputWord> out;
37 testDriver = new TopologyTestDriver(
38 SplitterStreamProcessor.buildTopology(IN, OUT),
39 serializationConfig());
41 in = testDriver.createInputTopic(
43 new JsonSerializer().noTypeInfo(),
44 new JsonSerializer().noTypeInfo());
46 out = testDriver.createOutputTopic(
48 new JsonDeserializer().copyWithType(TestOutputUser.class),
49 new JsonDeserializer().copyWithType(TestOutputWord.class));
58 .forEach(kv -> in.pipeInput(kv.key, kv.value));
60 MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages = new LinkedMultiValueMap<>();
63 .forEach(record -> receivedMessages.add(record.key(), record.value()));
65 TestData.assertExpectedMessages(receivedMessages);
69 public void tearDown()