1 package de.juplo.kafka.wordcount.splitter;
3 import de.juplo.kafka.wordcount.counter.TestOutputUser;
4 import de.juplo.kafka.wordcount.counter.TestOutputWord;
5 import de.juplo.kafka.wordcount.recorder.TestInputRecording;
6 import de.juplo.kafka.wordcount.recorder.TestInputUser;
7 import org.apache.kafka.streams.KeyValue;
8 import org.springframework.util.LinkedMultiValueMap;
9 import org.springframework.util.MultiValueMap;
11 import java.time.Duration;
12 import java.util.stream.Stream;
14 import static org.assertj.core.api.Assertions.assertThat;
15 import static org.awaitility.Awaitility.await;
20 static final TestInputUser PETER = TestInputUser.of("peter");
21 static final TestInputUser KLAUS = TestInputUser.of("klaus");
24 static final Stream<KeyValue<TestInputUser, TestInputRecording>> getInputMessages()
26 return Stream.of(INPUT_MESSAGES);
29 private static final KeyValue<TestInputUser, TestInputRecording>[] INPUT_MESSAGES = new KeyValue[]
33 TestInputRecording.of(PETER.getUser(), "Hallo Welt!")),
36 TestInputRecording.of(KLAUS.getUser(), "Müsch gäb's auch!")),
39 TestInputRecording.of(PETER.getUser(), "Boäh, echt! ß mal nä Nümmäh!")),
42 static void assertExpectedMessages(MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages)
44 await("Received expected messages")
45 .atMost(Duration.ofSeconds(5))
46 .untilAsserted(() -> expectedMessages().forEach((user, word) ->
47 assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
50 private static final KeyValue<TestOutputUser, TestOutputWord>[] EXPECTED_MESSAGES = new KeyValue[]
53 TestOutputUser.of(PETER.getUser()),
54 TestOutputWord.of(PETER.getUser(), "Hallo")),
56 TestOutputUser.of(PETER.getUser()),
57 TestOutputWord.of(PETER.getUser(), "Welt")),
59 TestOutputUser.of(KLAUS.getUser()),
60 TestOutputWord.of(KLAUS.getUser(), "Müsch")),
62 TestOutputUser.of(KLAUS.getUser()),
63 TestOutputWord.of(KLAUS.getUser(), "gäb")),
65 TestOutputUser.of(KLAUS.getUser()),
66 TestOutputWord.of(KLAUS.getUser(), "s")),
68 TestOutputUser.of(KLAUS.getUser()),
69 TestOutputWord.of(KLAUS.getUser(), "auch")),
71 TestOutputUser.of(PETER.getUser()),
72 TestOutputWord.of(PETER.getUser(), "Boäh")),
74 TestOutputUser.of(PETER.getUser()),
75 TestOutputWord.of(PETER.getUser(), "echt")),
77 TestOutputUser.of(PETER.getUser()),
78 TestOutputWord.of(PETER.getUser(), "ß")),
80 TestOutputUser.of(PETER.getUser()),
81 TestOutputWord.of(PETER.getUser(), "mal")),
83 TestOutputUser.of(PETER.getUser()),
84 TestOutputWord.of(PETER.getUser(), "nä")),
86 TestOutputUser.of(PETER.getUser()),
87 TestOutputWord.of(PETER.getUser(), "Nümmäh")),
90 static MultiValueMap<TestOutputUser, TestOutputWord> expectedMessages()
92 MultiValueMap<TestOutputUser, TestOutputWord> expectedMessages = new LinkedMultiValueMap<>();
94 .of(EXPECTED_MESSAGES)
95 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
96 return expectedMessages;