X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessorTopologyTest.java;h=4b67052e3cfcaba8aaff143a2df0d03aba1a5570;hb=237133719b1d06d542302fb948d9cf3aff80a8a4;hp=955d7a063c695f70bc753054f51ac14a69855eb2;hpb=7efbc8c73e3f2bb1f553a61325b8576886bab254;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 955d7a0..4b67052 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -64,7 +64,7 @@ public class CounterStreamProcessorTopologyTest { TestData .getInputMessages() - .forEach(word -> in.pipeInput(word.getUser(), word)); + .forEach(kv -> in.pipeInput(kv.key, kv.value)); MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out @@ -72,6 +72,9 @@ public class CounterStreamProcessorTopologyTest .forEach(record -> receivedMessages.add(record.key(), record.value())); TestData.assertExpectedMessages(receivedMessages); + + TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); + TestData.assertExpectedLastMessagesForWord(receivedMessages); } @AfterEach