1 package de.juplo.kafka.wordcount.top10;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.apache.kafka.streams.state.KeyValueStore;
9 import org.apache.kafka.streams.state.Stores;
10 import org.junit.jupiter.api.AfterEach;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.kafka.support.serializer.JsonDeserializer;
14 import org.springframework.kafka.support.serializer.JsonSerde;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
16 import org.springframework.util.LinkedMultiValueMap;
17 import org.springframework.util.MultiValueMap;
20 import java.util.Properties;
21 import java.util.stream.Stream;
23 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
24 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
25 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
26 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
30 public class Top10StreamProcessorTopologyTest
32 public static final String IN = "TEST-IN";
33 public static final String OUT = "TEST-OUT";
34 public static final String STORE_NAME = "TOPOLOGY-TEST";
37 TopologyTestDriver testDriver;
38 TestInputTopic<Key, Entry> in;
39 TestOutputTopic<User, Ranking> out;
45 Topology topology = Top10StreamProcessor.buildTopology(
48 Stores.inMemoryKeyValueStore(STORE_NAME));
50 Top10ApplicationConfiguration applicationConfiguriation =
51 new Top10ApplicationConfiguration();
52 Properties streamProcessorProperties =
53 applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
54 Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
56 JsonSerde<?> keySerde = new JsonSerde<>();
57 keySerde.configure(propertyMap, true);
58 JsonSerde<?> valueSerde = new JsonSerde<>();
59 valueSerde.configure(propertyMap, false);
61 testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
63 in = testDriver.createInputTopic(
65 (JsonSerializer<Key>)keySerde.serializer(),
66 (JsonSerializer<Entry>)valueSerde.serializer());
68 out = testDriver.createOutputTopic(
70 (JsonDeserializer<User>)keySerde.deserializer(),
71 (JsonDeserializer<Ranking>)valueSerde.deserializer());
80 .of(TestData.INPUT_MESSAGES)
81 .forEach(kv -> in.pipeInput(
82 Key.of(kv.key.getUser(), kv.key.getWord()),
83 Entry.of(kv.value.getWord(), kv.value.getCounter())));
85 MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
91 "OUT: {} -> {}, {}, {}",
94 parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
95 parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
96 receivedMessages.add(record.key(), record.value());
99 TestData.assertExpectedMessages(receivedMessages);
101 TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
102 TestData.assertExpectedLastMessagesForUsers(receivedMessages);
104 KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
105 TestData.assertExpectedState(store);
109 public void tearDown()