1 package de.juplo.kafka.wordcount.counter;
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.common.serialization.StringSerializer;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.Stores;
13 import org.junit.jupiter.api.AfterEach;
14 import org.junit.jupiter.api.BeforeEach;
15 import org.junit.jupiter.api.Test;
16 import org.springframework.kafka.support.serializer.JsonDeserializer;
17 import org.springframework.kafka.support.serializer.JsonSerializer;
18 import org.springframework.util.LinkedMultiValueMap;
19 import org.springframework.util.MultiValueMap;
21 import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
22 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
23 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
24 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
28 public class CounterStreamProcessorTopologyTest
30 public final static String IN = "TEST-IN";
31 public final static String OUT = "TEST-OUT";
34 TopologyTestDriver testDriver;
35 TestInputTopic<String, TestInputWord> in;
36 TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
40 public void setUpTestDriver()
42 Topology topology = CounterStreamProcessor.buildTopology(
45 Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
47 testDriver = new TopologyTestDriver(topology, serializationConfig());
49 in = testDriver.createInputTopic(
51 new StringSerializer(),
52 new JsonSerializer().noTypeInfo());
54 out = testDriver.createOutputTopic(
56 new JsonDeserializer()
57 .copyWithType(TestOutputWord.class)
59 new JsonDeserializer()
60 .copyWithType(TestOutputWordCounter.class)
61 .ignoreTypeHeaders());
70 .forEach(word -> in.pipeInput(word.getUser(), word));
72 MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
78 "OUT: {} -> {}, {}, {}",
81 parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
82 parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
83 receivedMessages.add(record.key(), record.value());
86 TestData.assertExpectedMessages(receivedMessages);
90 public void tearDown()