counter: 1.2.15 - Added assertion for the expected final output messages
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import org.apache.kafka.streams.KeyValue;
7 import org.springframework.util.LinkedMultiValueMap;
8 import org.springframework.util.MultiValueMap;
9
10 import java.util.function.Consumer;
11 import java.util.stream.Stream;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static final String PETER = "peter";
19         static final String KLAUS = "klaus";
20
21         static final String WORD_HALLO = "Hallo";
22         static final String WORD_MÜSCH = "Müsch";
23         static final String WORD_WELT = "Welt";
24         static final String WORD_S = "s";
25         static final String WORD_BOÄH = "Boäh";
26
27         static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
28         static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
29         static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
30         static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
31         static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
32
33         private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
34         {
35                         new KeyValue<>(
36                                         PETER,
37                                         TestInputWord.of(PETER, WORD_HALLO)),
38                         new KeyValue<>(
39                                         KLAUS,
40                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
41                         new KeyValue<>(
42                                         PETER,
43                                         TestInputWord.of(PETER, WORD_WELT)),
44                         new KeyValue<>(
45                                         KLAUS,
46                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
47                         new KeyValue<>(
48                                         KLAUS,
49                                         TestInputWord.of(KLAUS, WORD_S)),
50                         new KeyValue<>(
51                                         PETER,
52                                         TestInputWord.of(PETER, WORD_BOÄH)),
53                         new KeyValue<>(
54                                         PETER,
55                                         TestInputWord.of(PETER, WORD_WELT)),
56                         new KeyValue<>(
57                                         PETER,
58                                         TestInputWord.of(PETER, WORD_BOÄH)),
59                         new KeyValue<>(
60                                         KLAUS,
61                                         TestInputWord.of(KLAUS, WORD_S)),
62                         new KeyValue<>(
63                                         PETER,
64                                         TestInputWord.of(PETER, WORD_BOÄH)),
65                         new KeyValue<>(
66                                         KLAUS,
67                                         TestInputWord.of(KLAUS, WORD_S)),
68         };
69
70         static Stream<KeyValue<String, TestInputWord>> getInputMessages()
71         {
72                 return Stream.of(TestData.INPUT_MESSAGES);
73         }
74
75         static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedMessagesAssertion()
76         {
77                 return receivedMessages -> assertExpectedMessages(receivedMessages);
78         }
79
80         static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
81         {
82                 expectedMessages().forEach(
83                                 (word, counter) ->
84                                                 assertThat(receivedMessages.get(word))
85                                                                 .containsExactlyElementsOf(counter));
86         }
87
88         static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedNumberOfMessagesForWordAssertion()
89         {
90                 return receivedMessages -> assertExpectedNumberOfMessagesForWord(receivedMessages);
91         }
92
93         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
94         {
95                 assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
96                 assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
97                 assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
98                 assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
99                 assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
100         }
101
102         private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
103         {
104                 return messagesForUsers.get(word).size();
105         }
106
107         static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedLastMessagesForWordAssertion()
108         {
109                 return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages);
110         }
111
112         static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
113         {
114                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
115                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
116                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
117                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
118                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
119         }
120
121         private static void assertWordCountEqualsWordCountFromLastMessage(
122                         TestOutputWord word,
123                         TestOutputWordCounter counter)
124         {
125                 assertThat(counter).isEqualTo(getLastMessageFor(word));
126         }
127
128         private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
129         {
130                 return getLastMessageFor(word, expectedMessages());
131         }
132
133         private static TestOutputWordCounter getLastMessageFor(
134                         TestOutputWord user,
135                         MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
136         {
137                 return messagesForWord
138                                 .get(user)
139                                 .stream()
140                                 .reduce(null, (left, right) -> right);
141         }
142
143         private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
144         {
145                         KeyValue.pair(
146                                         PETER_HALLO,
147                                         TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
148                         KeyValue.pair(
149                                         KLAUS_MÜSCH,
150                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
151                         KeyValue.pair(
152                                         PETER_WELT,
153                                         TestOutputWordCounter.of(PETER, WORD_WELT,1)),
154                         KeyValue.pair(
155                                         KLAUS_MÜSCH,
156                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
157                         KeyValue.pair(
158                                         KLAUS_S,
159                                         TestOutputWordCounter.of(KLAUS, WORD_S,1)),
160                         KeyValue.pair(
161                                         PETER_BOÄH,
162                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
163                         KeyValue.pair(
164                                         PETER_WELT,
165                                         TestOutputWordCounter.of(PETER, WORD_WELT,2)),
166                         KeyValue.pair(
167                                         PETER_BOÄH,
168                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
169                         KeyValue.pair(
170                                         KLAUS_S,
171                                         TestOutputWordCounter.of(KLAUS, WORD_S,2)),
172                         KeyValue.pair(
173                                         PETER_BOÄH,
174                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
175                         KeyValue.pair(
176                                         KLAUS_S,
177                                         TestOutputWordCounter.of(KLAUS, WORD_S,3)),
178         };
179
180         static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
181         {
182                 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
183                 Stream
184                                 .of(EXPECTED_MESSAGES)
185                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
186                 return expectedMessages;
187         }
188 }