1 package de.juplo.kafka.wordcount.splitter;
3 import de.juplo.kafka.wordcount.counter.TestWord;
4 import org.apache.kafka.streams.KeyValue;
5 import org.springframework.util.LinkedMultiValueMap;
6 import org.springframework.util.MultiValueMap;
8 import java.time.Duration;
9 import java.util.function.BiConsumer;
10 import java.util.stream.Stream;
12 import static org.assertj.core.api.Assertions.assertThat;
13 import static org.awaitility.Awaitility.await;
18 static void writeInputData(BiConsumer<String, Recording> consumer)
22 recording = new Recording();
23 recording.setUser("peter");
24 recording.setSentence("Hallo Welt!");
25 consumer.accept(recording.getUser(), recording);
27 recording = new Recording();
28 recording.setUser("klaus");
29 recording.setSentence("Müsch gäb's auch!");
30 consumer.accept(recording.getUser(), recording);
32 recording = new Recording();
33 recording.setUser("peter");
34 recording.setSentence("Boäh, echt! ß mal nä Nümmäh!");
35 consumer.accept(recording.getUser(), recording);
38 static void assertExpectedMessages(MultiValueMap<String, TestWord> receivedMessages)
40 MultiValueMap<String, TestWord> expected = new LinkedMultiValueMap<>();
41 expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value));
42 await("Received expected messages")
43 .atMost(Duration.ofSeconds(5))
44 .untilAsserted(() -> expected.forEach((user, word) ->
45 assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
48 static Stream<KeyValue<String, TestWord>> expectedMessages = Stream.of(
51 TestWord.of("peter", "Hallo")),
54 TestWord.of("peter", "Welt")),
57 TestWord.of("klaus", "Müsch")),
60 TestWord.of("klaus", "gäb")),
63 TestWord.of("klaus", "s")),
66 TestWord.of("klaus", "auch")),
69 TestWord.of("peter", "Boäh")),
72 TestWord.of("peter", "echt")),
75 TestWord.of("peter", "ß")),
78 TestWord.of("peter", "mal")),
81 TestWord.of("peter", "nä")),
84 TestWord.of("peter", "Nümmäh")));