1 package de.juplo.kafka.wordcount.top10;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.state.Stores;
6 import org.junit.jupiter.api.Test;
7 import org.springframework.kafka.support.serializer.JsonDeserializer;
8 import org.springframework.kafka.support.serializer.JsonSerde;
9 import org.springframework.kafka.support.serializer.JsonSerializer;
11 import java.time.Instant;
12 import java.util.List;
14 import java.util.Properties;
15 import java.util.function.BiConsumer;
17 import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
18 import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
19 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
20 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
24 public class Top10StreamProcessorTopologyTest
26 public final static String IN = "TEST-IN";
27 public final static String OUT = "TEST-OUT";
32 Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
34 Top10ApplicationConfiguration applicationConfiguriation =
35 new Top10ApplicationConfiguration();
36 Properties streamProcessorProperties =
37 applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
38 Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
40 JsonSerde<?> keySerde = new JsonSerde<>();
41 keySerde.configure(propertyMap, true);
42 JsonSerde<?> valueSerde = new JsonSerde<>();
43 valueSerde.configure(propertyMap, false);
45 TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
47 TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
49 (JsonSerializer<Key>)keySerde.serializer(),
50 (JsonSerializer<Entry>)valueSerde.serializer());
52 TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
54 (JsonDeserializer<String>)keySerde.deserializer(),
55 (JsonDeserializer<Ranking>)valueSerde.deserializer());
57 TestData.writeInputData(new BiConsumer<Key, Counter>()
59 private Instant timestamp = Instant.now();
62 public void accept(Key key, Counter value)
66 Entry.of(value.getWord(), value.getCounter()),
68 timestamp = timestamp.plusMillis(500);
72 List<KeyValue<String, Ranking>> receivedMessages = out
78 "OUT: {} -> {}, {}, {}",
81 parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
82 parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
83 return KeyValue.pair(record.key(), record.value());
87 TestData.assertExpectedResult(receivedMessages);