counter: 1.2.15 - `ToplogyTestDriver.close` must always be called
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.apache.kafka.streams.state.Stores;
9 import org.junit.jupiter.api.AfterEach;
10 import org.junit.jupiter.api.BeforeEach;
11 import org.junit.jupiter.api.Test;
12 import org.springframework.kafka.support.serializer.JsonDeserializer;
13 import org.springframework.kafka.support.serializer.JsonSerde;
14 import org.springframework.kafka.support.serializer.JsonSerializer;
15 import org.springframework.util.LinkedMultiValueMap;
16 import org.springframework.util.MultiValueMap;
17
18 import java.util.Map;
19 import java.util.Properties;
20
21 import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
22 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
23 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
24 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
25
26
27 @Slf4j
28 public class CounterStreamProcessorTopologyTest
29 {
30   public final static String IN = "TEST-IN";
31   public final static String OUT = "TEST-OUT";
32
33
34   TopologyTestDriver testDriver;
35   TestInputTopic<String, Word> in;
36   TestOutputTopic<Word, WordCounter> out;
37
38
39   @BeforeEach
40   public void setUpTestDriver()
41   {
42     Topology topology = CounterStreamProcessor.buildTopology(
43         IN,
44         OUT,
45         Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
46
47     CounterApplicationConfiguriation applicationConfiguriation =
48         new CounterApplicationConfiguriation();
49     Properties streamProcessorProperties =
50         applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
51     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
52
53     JsonSerde<?> keySerde = new JsonSerde<>();
54     keySerde.configure(propertyMap, true);
55     JsonSerde<?> valueSerde = new JsonSerde<>();
56     valueSerde.configure(propertyMap, false);
57
58     testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
59
60     in = testDriver.createInputTopic(
61         IN,
62         (JsonSerializer<String>)keySerde.serializer(),
63         (JsonSerializer<Word>)valueSerde.serializer());
64
65     out = testDriver.createOutputTopic(
66         OUT,
67         (JsonDeserializer<Word>)keySerde.deserializer(),
68         (JsonDeserializer<WordCounter>)valueSerde.deserializer());
69   }
70
71
72   @Test
73   public void test()
74   {
75     TestData.injectInputMessages((key, value) -> in.pipeInput(key, value));
76
77     MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
78     out
79         .readRecordsToList()
80         .forEach(record ->
81         {
82           log.debug(
83               "OUT: {} -> {}, {}, {}",
84               record.key(),
85               record.value(),
86               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
87               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
88           receivedMessages.add(record.key(), record.value());
89         });
90
91     TestData.assertExpectedMessages(receivedMessages);
92   }
93
94   @AfterEach
95   public void tearDown()
96   {
97     testDriver.close();
98   }
99 }