X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;h=adf4dde6471544d7f30482db1139706af0530199;hb=52831df8733700f29fd2e430d7a8912c26f22c08;hp=d1bbc0fb794280600901bc60ba70b6e7571c9788;hpb=d7fa29b35fef537b65e9110f708e781925ebff9f;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index d1bbc0f..adf4dde 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -19,11 +18,8 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.function.BiConsumer; -import java.util.stream.Stream; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; -import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -59,87 +55,18 @@ public class SplitterApplicationIT consumer.received.clear(); } - static void writeInputData(BiConsumer consumer) - { - Recording recording; - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Hallo Welt!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("klaus"); - recording.setSentence("Müsch gäb's auch!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - consumer.accept(recording.getUser(), recording); - } - @Test void testSendMessage() throws Exception { - writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); + TestData.writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> - assertExpectedMessages(consumer.getReceivedMessages())); + TestData.assertExpectedMessages(consumer.getReceivedMessages())); } - static void assertExpectedMessages(MultiValueMap receivedMessages) - { - MultiValueMap expected = new LinkedMultiValueMap<>(); - expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); - await("Received expected messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> expected.forEach((user, word) -> - assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); - } - - static Stream> expectedMessages = Stream.of( - KeyValue.pair( - "peter", - new TestWord("peter", "Hallo")), - KeyValue.pair( - "peter", - new TestWord("peter", "Welt")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "Müsch")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "gäb")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "s")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "auch")), - KeyValue.pair( - "peter", - new TestWord("peter", "Boäh")), - KeyValue.pair( - "peter", - new TestWord("peter", "echt")), - KeyValue.pair( - "peter", - new TestWord("peter", "ß")), - KeyValue.pair( - "peter", - new TestWord("peter", "mal")), - KeyValue.pair( - "peter", - new TestWord("peter", "nä")), - KeyValue.pair( - "peter", - new TestWord("peter", "Nümmäh"))); - - static class Consumer { private final MultiValueMap received = new LinkedMultiValueMap<>();