splitter: 1.2.0 - Added `SplitterStreamProcessorTopologyTest`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import de.juplo.kafka.wordcount.counter.TestOutputUser;
4 import de.juplo.kafka.wordcount.counter.TestOutputWord;
5 import de.juplo.kafka.wordcount.recorder.TestInputRecording;
6 import de.juplo.kafka.wordcount.recorder.TestInputUser;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.TopologyTestDriver;
11 import org.junit.jupiter.api.AfterEach;
12 import org.junit.jupiter.api.BeforeEach;
13 import org.junit.jupiter.api.Test;
14 import org.springframework.kafka.support.serializer.JsonDeserializer;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
16 import org.springframework.util.LinkedMultiValueMap;
17 import org.springframework.util.MultiValueMap;
18
19 import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig;
20
21
22 @Slf4j
23 public class SplitterStreamProcessorTopologyTest
24 {
25   public final static String IN = "TEST-IN";
26   public final static String OUT = "TEST-OUT";
27
28
29   TopologyTestDriver testDriver;
30   TestInputTopic<TestInputUser, TestInputRecording> in;
31   TestOutputTopic<TestOutputUser, TestOutputWord> out;
32
33
34   @BeforeEach
35   public void setUp()
36   {
37     testDriver = new TopologyTestDriver(
38         SplitterStreamProcessor.buildTopology(IN, OUT),
39         serializationConfig());
40
41     in = testDriver.createInputTopic(
42         IN,
43         new JsonSerializer().noTypeInfo(),
44         new JsonSerializer().noTypeInfo());
45
46     out = testDriver.createOutputTopic(
47         OUT,
48         new JsonDeserializer().copyWithType(TestOutputUser.class),
49         new JsonDeserializer().copyWithType(TestOutputWord.class));
50   }
51
52
53   @Test
54   public void test()
55   {
56     TestData
57         .getInputMessages()
58         .forEach(kv -> in.pipeInput(kv.key, kv.value));
59
60     MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages = new LinkedMultiValueMap<>();
61     out
62         .readRecordsToList()
63         .forEach(record -> receivedMessages.add(record.key(), record.value()));
64
65     TestData.assertExpectedMessages(receivedMessages);
66   }
67
68   @AfterEach
69   public void tearDown()
70   {
71     testDriver.close();
72   }
73 }