import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.Map;
import java.util.Properties;
-import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
-import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
@Test
public void test()
{
- Topology topology = CounterStreamProcessor.buildTopology(
- IN,
- OUT,
- Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
+ Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
- CounterApplicationConfiguriation applicationConfiguriation =
- new CounterApplicationConfiguriation();
+ Top10ApplicationConfiguration applicationConfiguriation =
+ new Top10ApplicationConfiguration();
Properties streamProcessorProperties =
- applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
+ applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
JsonSerde<?> keySerde = new JsonSerde<>();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
- TestInputTopic<String, Word> in = testDriver.createInputTopic(
+ TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
IN,
- (JsonSerializer<String>)keySerde.serializer(),
- (JsonSerializer<Word>)valueSerde.serializer());
+ (JsonSerializer<Key>)keySerde.serializer(),
+ (JsonSerializer<Entry>)valueSerde.serializer());
- TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
+ TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
OUT,
- (JsonDeserializer<Word>)keySerde.deserializer(),
- (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+ (JsonDeserializer<String>)keySerde.deserializer(),
+ (JsonDeserializer<Ranking>)valueSerde.deserializer());
- TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+ TestData.writeInputData((key, value) -> in.pipeInput(
+ key,
+ Entry.of(value.getWord(), value.getCounter())));
- List<KeyValue<Word, WordCounter>> receivedMessages = out
+ List<KeyValue<String, Ranking>> receivedMessages = out
.readRecordsToList()
.stream()
.map(record ->