X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessorTopologyTest.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessorTopologyTest.java;h=8e09d0c6ff9bacaca359bf1ecf91600aec4c10dd;hb=48dabf093db63d517252f47b15f597e80852e9d4;hp=1b3e1e44415aaa617f90a97e37b5a19fcbe8bc86;hpb=7768a8989ee4c1a9340f2e2e396aa7335b37fb0a;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 1b3e1e4..8e09d0c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -17,6 +17,7 @@ import org.springframework.util.MultiValueMap; import java.util.Map; import java.util.Properties; +import java.util.stream.Stream; import static de.juplo.kafka.wordcount.counter.TestData.convertToMap; import static de.juplo.kafka.wordcount.counter.TestData.parseHeader; @@ -72,7 +73,9 @@ public class CounterStreamProcessorTopologyTest @Test public void test() { - TestData.injectInputMessages((key, value) -> in.pipeInput(key, value)); + Stream + .of(TestData.INPUT_MESSAGES) + .forEach(word -> in.pipeInput(word.getUser(), word)); MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out