1 package de.juplo.kafka.wordcount.top10;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.TestInputTopic;
5 import org.apache.kafka.streams.TestOutputTopic;
6 import org.apache.kafka.streams.Topology;
7 import org.apache.kafka.streams.TopologyTestDriver;
8 import org.apache.kafka.streams.state.KeyValueStore;
9 import org.apache.kafka.streams.state.Stores;
10 import org.junit.jupiter.api.AfterEach;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.kafka.support.serializer.JsonDeserializer;
14 import org.springframework.kafka.support.serializer.JsonSerde;
15 import org.springframework.kafka.support.serializer.JsonSerializer;
16 import org.springframework.util.LinkedMultiValueMap;
17 import org.springframework.util.MultiValueMap;
20 import java.util.Properties;
21 import java.util.stream.Stream;
23 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
24 import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
25 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
26 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
30 public class Top10StreamProcessorTopologyTest
32 public static final String IN = "TEST-IN";
33 public static final String OUT = "TEST-OUT";
34 public static final String STORE_NAME = "TOPOLOGY-TEST";
37 TopologyTestDriver testDriver;
38 TestInputTopic<Key, Entry> in;
39 TestOutputTopic<User, Ranking> out;
45 Topology topology = Top10StreamProcessor.buildTopology(
48 Stores.inMemoryKeyValueStore(STORE_NAME));
50 Map<String, Object> propertyMap = serializationConfig();
52 Properties properties = new Properties();
53 properties.putAll(propertyMap);
55 JsonSerde<?> keySerde = new JsonSerde<>();
56 keySerde.configure(propertyMap, true);
57 JsonSerde<?> valueSerde = new JsonSerde<>();
58 valueSerde.configure(propertyMap, false);
60 testDriver = new TopologyTestDriver(topology, properties);
62 in = testDriver.createInputTopic(
64 (JsonSerializer<Key>)keySerde.serializer(),
65 (JsonSerializer<Entry>)valueSerde.serializer());
67 out = testDriver.createOutputTopic(
69 (JsonDeserializer<User>)keySerde.deserializer(),
70 (JsonDeserializer<Ranking>)valueSerde.deserializer());
79 .of(TestData.INPUT_MESSAGES)
80 .forEach(kv -> in.pipeInput(
81 Key.of(kv.key.getUser(), kv.key.getWord()),
82 Entry.of(kv.value.getWord(), kv.value.getCounter())));
84 MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
90 "OUT: {} -> {}, {}, {}",
93 parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
94 parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
95 receivedMessages.add(record.key(), record.value());
98 TestData.assertExpectedMessages(receivedMessages);
100 TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
101 TestData.assertExpectedLastMessagesForUsers(receivedMessages);
103 KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
104 TestData.assertExpectedState(store);
108 public void tearDown()