import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
TopologyTestDriver testDriver;
- TestInputTopic<Key, Entry> in;
- TestOutputTopic<User, Ranking> out;
+ TestInputTopic<TestWord, TestCounter> in;
+ TestOutputTopic<TestUser, TestRanking> out;
@BeforeEach
in = testDriver.createInputTopic(
IN,
- jsonSerializer(Key.class, true),
- jsonSerializer(Entry.class,false));
+ jsonSerializer(TestWord.class, true),
+ jsonSerializer(TestCounter.class,false));
out = testDriver.createOutputTopic(
OUT,
new JsonDeserializer()
- .copyWithType(User.class)
+ .copyWithType(TestUser.class)
.ignoreTypeHeaders(),
new JsonDeserializer()
- .copyWithType(Ranking.class)
+ .copyWithType(TestRanking.class)
.ignoreTypeHeaders());
}
{
Stream
.of(TestData.INPUT_MESSAGES)
- .forEach(kv -> in.pipeInput(
- Key.of(kv.key.getUser(), kv.key.getWord()),
- Entry.of(kv.value.getWord(), kv.value.getCounter())));
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
- MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
.forEach(record -> receivedMessages.add(record.key(), record.value()));