1 package de.juplo.kafka.wordcount.counter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.apache.kafka.streams.state.Stores;
9 import org.junit.jupiter.api.AfterEach;
10 import org.junit.jupiter.api.BeforeEach;
11 import org.junit.jupiter.api.Test;
12 import org.springframework.kafka.support.serializer.JsonDeserializer;
13 import org.springframework.kafka.support.serializer.JsonSerde;
14 import org.springframework.kafka.support.serializer.JsonSerializer;
15 import org.springframework.util.LinkedMultiValueMap;
16 import org.springframework.util.MultiValueMap;
19 import java.util.Properties;
21 import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
22 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
23 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
24 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
28 public class CounterStreamProcessorTopologyTest
30 public final static String IN = "TEST-IN";
31 public final static String OUT = "TEST-OUT";
34 TopologyTestDriver testDriver;
35 TestInputTopic<String, Word> in;
36 TestOutputTopic<Word, WordCounter> out;
40 public void setUpTestDriver()
42 Topology topology = CounterStreamProcessor.buildTopology(
45 Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
47 CounterApplicationConfiguriation applicationConfiguriation =
48 new CounterApplicationConfiguriation();
49 Properties streamProcessorProperties =
50 applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
51 Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
53 JsonSerde<?> keySerde = new JsonSerde<>();
54 keySerde.configure(propertyMap, true);
55 JsonSerde<?> valueSerde = new JsonSerde<>();
56 valueSerde.configure(propertyMap, false);
58 testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
60 in = testDriver.createInputTopic(
62 (JsonSerializer<String>)keySerde.serializer(),
63 (JsonSerializer<Word>)valueSerde.serializer());
65 out = testDriver.createOutputTopic(
67 (JsonDeserializer<Word>)keySerde.deserializer(),
68 (JsonDeserializer<WordCounter>)valueSerde.deserializer());
75 TestData.injectInputMessages((key, value) -> in.pipeInput(key, value));
77 MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
83 "OUT: {} -> {}, {}, {}",
86 parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
87 parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
88 receivedMessages.add(record.key(), record.value());
91 TestData.assertExpectedMessages(receivedMessages);
95 public void tearDown()