props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
- props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Key.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Counter.class.getName());
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.Map;
import java.util.Properties;
-import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
-import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
{
Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
- Top10ApplicationConfiguriation applicationConfiguriation =
- new Top10ApplicationConfiguriation();
+ Top10ApplicationConfiguration applicationConfiguriation =
+ new Top10ApplicationConfiguration();
Properties streamProcessorProperties =
- applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
+ applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
JsonSerde<?> keySerde = new JsonSerde<>();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
- TestInputTopic<String, Word> in = testDriver.createInputTopic(
+ TestInputTopic<Key, Counter> in = testDriver.createInputTopic(
IN,
- (JsonSerializer<String>)keySerde.serializer(),
- (JsonSerializer<Word>)valueSerde.serializer());
+ (JsonSerializer<Key>)keySerde.serializer(),
+ (JsonSerializer<Counter>)valueSerde.serializer());
- TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
+ TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
OUT,
- (JsonDeserializer<Word>)keySerde.deserializer(),
- (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+ (JsonDeserializer<String>)keySerde.deserializer(),
+ (JsonDeserializer<Ranking>)valueSerde.deserializer());
TestData.writeInputData((key, value) -> in.pipeInput(key, value));
- List<KeyValue<Word, WordCounter>> receivedMessages = out
+ List<KeyValue<String, Ranking>> receivedMessages = out
.readRecordsToList()
.stream()
.map(record ->