1 package de.juplo.kafka.wordcount.counter;
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import org.apache.kafka.streams.KeyValue;
7 import org.springframework.util.LinkedMultiValueMap;
8 import org.springframework.util.MultiValueMap;
10 import java.util.function.Consumer;
11 import java.util.stream.Stream;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static final String PETER = "peter";
19 static final String KLAUS = "klaus";
21 private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
25 TestInputWord.of(PETER, "Hallo")),
28 TestInputWord.of(KLAUS, "Müsch")),
31 TestInputWord.of(PETER, "Welt")),
34 TestInputWord.of(KLAUS, "Müsch")),
37 TestInputWord.of(KLAUS, "s")),
40 TestInputWord.of(PETER, "Boäh")),
43 TestInputWord.of(PETER, "Welt")),
46 TestInputWord.of(PETER, "Boäh")),
49 TestInputWord.of(KLAUS, "s")),
52 TestInputWord.of(PETER, "Boäh")),
55 TestInputWord.of(KLAUS, "s")),
58 static Stream<KeyValue<String, TestInputWord>> getInputMessages()
60 return Stream.of(TestData.INPUT_MESSAGES);
63 static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedMessagesAssertion()
65 return receivedMessages -> assertExpectedMessages(receivedMessages);
68 static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
70 expectedMessages().forEach(
72 assertThat(receivedMessages.get(word))
73 .containsExactlyElementsOf(counter));
76 private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
79 TestOutputWord.of(PETER, "Hallo"),
80 TestOutputWordCounter.of(PETER, "Hallo",1)),
82 TestOutputWord.of(KLAUS, "Müsch"),
83 TestOutputWordCounter.of(KLAUS, "Müsch",1)),
85 TestOutputWord.of(PETER, "Welt"),
86 TestOutputWordCounter.of(PETER, "Welt",1)),
88 TestOutputWord.of(KLAUS, "Müsch"),
89 TestOutputWordCounter.of(KLAUS, "Müsch",2)),
91 TestOutputWord.of(KLAUS, "s"),
92 TestOutputWordCounter.of(KLAUS, "s",1)),
94 TestOutputWord.of(PETER, "Boäh"),
95 TestOutputWordCounter.of(PETER, "Boäh",1)),
97 TestOutputWord.of(PETER, "Welt"),
98 TestOutputWordCounter.of(PETER, "Welt",2)),
100 TestOutputWord.of(PETER, "Boäh"),
101 TestOutputWordCounter.of(PETER, "Boäh",2)),
103 TestOutputWord.of(KLAUS, "s"),
104 TestOutputWordCounter.of(KLAUS, "s",2)),
106 TestOutputWord.of(PETER, "Boäh"),
107 TestOutputWordCounter.of(PETER, "Boäh",3)),
109 TestOutputWord.of(KLAUS, "s"),
110 TestOutputWordCounter.of(KLAUS, "s",3)),
113 static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
115 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
117 .of(EXPECTED_MESSAGES)
118 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
119 return expectedMessages;