TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
- TestInputTopic<Key, Counter> in = testDriver.createInputTopic(
+ TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
IN,
(JsonSerializer<Key>)keySerde.serializer(),
- (JsonSerializer<Counter>)valueSerde.serializer());
+ (JsonSerializer<Entry>)valueSerde.serializer());
TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
OUT,
(JsonDeserializer<String>)keySerde.deserializer(),
(JsonDeserializer<Ranking>)valueSerde.deserializer());
- TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+ TestData.writeInputData((key, value) -> in.pipeInput(
+ key,
+ Entry.of(value.getWord(), value.getCounter())));
List<KeyValue<String, Ranking>> receivedMessages = out
.readRecordsToList()