counter: 1.3.1 - Cleand code/setup for tests
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputUser;
4 import de.juplo.kafka.wordcount.splitter.TestInputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWord;
6 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterEach;
15 import org.junit.jupiter.api.BeforeEach;
16 import org.junit.jupiter.api.Test;
17 import org.springframework.kafka.support.serializer.JsonDeserializer;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.util.LinkedMultiValueMap;
20 import org.springframework.util.MultiValueMap;
21
22 import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
23 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
24
25
26 @Slf4j
27 public class CounterStreamProcessorTopologyTest
28 {
29   public static final String IN = "TEST-IN";
30   public static final String OUT = "TEST-OUT";
31
32
33   TopologyTestDriver testDriver;
34   TestInputTopic<TestInputUser, TestInputWord> in;
35   TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
36
37
38   @BeforeEach
39   public void setUpTestDriver()
40   {
41     Topology topology = CounterStreamProcessor.buildTopology(
42         IN,
43         OUT,
44         Stores.inMemoryKeyValueStore(STORE_NAME));
45
46     testDriver = new TopologyTestDriver(topology, serializationConfig());
47
48     in = testDriver.createInputTopic(
49         IN,
50         new JsonSerializer().noTypeInfo(),
51         new JsonSerializer().noTypeInfo());
52
53     out = testDriver.createOutputTopic(
54         OUT,
55         new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(),
56         new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders());
57   }
58
59
60   @Test
61   public void test()
62   {
63     TestData
64         .getInputMessages()
65         .forEach(kv -> in.pipeInput(kv.key, kv.value));
66
67     MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
68     out
69         .readRecordsToList()
70         .forEach(record -> receivedMessages.add(record.key(), record.value()));
71
72     TestData.assertExpectedMessages(receivedMessages);
73
74     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
75     TestData.assertExpectedLastMessagesForWord(receivedMessages);
76
77     KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
78     TestData.assertExpectedState(store);
79   }
80
81   @AfterEach
82   public void tearDown()
83   {
84     testDriver.close();
85   }
86 }