X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FTestData.java;h=8c343d5e6d00136de545568b5d2705b24a7a3a55;hb=5283e1688c50764d828f83f7d58d4cf1c7e56cf9;hp=00ea4c4846e7811df0a9647c9f600f340507a2fd;hpb=52831df8733700f29fd2e430d7a8912c26f22c08;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index 00ea4c4..8c343d5 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -1,12 +1,12 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.recorder.TestRecording; import org.apache.kafka.streams.KeyValue; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.function.BiConsumer; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -15,71 +15,77 @@ import static org.awaitility.Awaitility.await; public class TestData { - static void writeInputData(BiConsumer consumer) - { - Recording recording; - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Hallo Welt!"); - consumer.accept(recording.getUser(), recording); + static final String PETER = "peter"; + static final String KLAUS = "klaus"; - recording = new Recording(); - recording.setUser("klaus"); - recording.setSentence("Müsch gäb's auch!"); - consumer.accept(recording.getUser(), recording); - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - consumer.accept(recording.getUser(), recording); - } + static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>( + PETER, + TestRecording.of(PETER, "Hallo Welt!")), + new KeyValue<>( + KLAUS, + TestRecording.of(KLAUS, "Müsch gäb's auch!")), + new KeyValue<>( + PETER, + TestRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")), + }; static void assertExpectedMessages(MultiValueMap receivedMessages) { - MultiValueMap expected = new LinkedMultiValueMap<>(); - expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); await("Received expected messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> expected.forEach((user, word) -> + .untilAsserted(() -> expectedMessages().forEach((user, word) -> assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } - static Stream> expectedMessages = Stream.of( + static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { KeyValue.pair( "peter", - new TestWord("peter", "Hallo")), + TestWord.of("peter", "Hallo")), KeyValue.pair( "peter", - new TestWord("peter", "Welt")), + TestWord.of("peter", "Welt")), KeyValue.pair( "klaus", - new TestWord("klaus", "Müsch")), + TestWord.of("klaus", "Müsch")), KeyValue.pair( "klaus", - new TestWord("klaus", "gäb")), + TestWord.of("klaus", "gäb")), KeyValue.pair( "klaus", - new TestWord("klaus", "s")), + TestWord.of("klaus", "s")), KeyValue.pair( "klaus", - new TestWord("klaus", "auch")), + TestWord.of("klaus", "auch")), KeyValue.pair( "peter", - new TestWord("peter", "Boäh")), + TestWord.of("peter", "Boäh")), KeyValue.pair( "peter", - new TestWord("peter", "echt")), + TestWord.of("peter", "echt")), KeyValue.pair( "peter", - new TestWord("peter", "ß")), + TestWord.of("peter", "ß")), KeyValue.pair( "peter", - new TestWord("peter", "mal")), + TestWord.of("peter", "mal")), KeyValue.pair( "peter", - new TestWord("peter", "nä")), + TestWord.of("peter", "nä")), KeyValue.pair( "peter", - new TestWord("peter", "Nümmäh"))); + TestWord.of("peter", "Nümmäh")), + }; + + static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } }