1 package de.juplo.kafka.wordcount.splitter;
3 import de.juplo.kafka.wordcount.counter.TestWord;
4 import org.apache.kafka.streams.KeyValue;
5 import org.springframework.util.LinkedMultiValueMap;
6 import org.springframework.util.MultiValueMap;
8 import java.time.Duration;
9 import java.util.function.BiConsumer;
10 import java.util.stream.Stream;
12 import static org.assertj.core.api.Assertions.assertThat;
13 import static org.awaitility.Awaitility.await;
18 static void writeInputData(BiConsumer<String, Recording> consumer)
22 recording = new Recording();
23 recording.setUser("peter");
24 recording.setSentence("Hallo Welt!");
25 consumer.accept(recording.getUser(), recording);
27 recording = new Recording();
28 recording.setUser("klaus");
29 recording.setSentence("Müsch gäb's auch!");
30 consumer.accept(recording.getUser(), recording);
32 recording = new Recording();
33 recording.setUser("peter");
34 recording.setSentence("Boäh, echt! ß mal nä Nümmäh!");
35 consumer.accept(recording.getUser(), recording);
38 static void assertExpectedMessages(MultiValueMap<String, TestWord> receivedMessages)
40 MultiValueMap<String, TestWord> expected = new LinkedMultiValueMap<>();
41 expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value));
42 await("Received expected messages")
43 .atMost(Duration.ofSeconds(5))
44 .untilAsserted(() -> expected.forEach((user, word) ->
45 assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
48 static Stream<KeyValue<String, TestWord>> expectedMessages = Stream.of(
51 new TestWord("peter", "Hallo")),
54 new TestWord("peter", "Welt")),
57 new TestWord("klaus", "Müsch")),
60 new TestWord("klaus", "gäb")),
63 new TestWord("klaus", "s")),
66 new TestWord("klaus", "auch")),
69 new TestWord("peter", "Boäh")),
72 new TestWord("peter", "echt")),
75 new TestWord("peter", "ß")),
78 new TestWord("peter", "mal")),
81 new TestWord("peter", "nä")),
84 new TestWord("peter", "Nümmäh")));