counter: 1.2.15 - Added assertion for the expected state
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import org.apache.kafka.streams.KeyValue;
7 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
8 import org.springframework.util.LinkedMultiValueMap;
9 import org.springframework.util.MultiValueMap;
10
11 import java.util.function.Consumer;
12 import java.util.stream.Stream;
13
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 class TestData
18 {
19         static final String PETER = "peter";
20         static final String KLAUS = "klaus";
21
22         static final String WORD_HALLO = "Hallo";
23         static final String WORD_MÜSCH = "Müsch";
24         static final String WORD_WELT = "Welt";
25         static final String WORD_S = "s";
26         static final String WORD_BOÄH = "Boäh";
27
28         static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
29         static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
30         static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
31         static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
32         static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
33
34         private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
35         {
36                         new KeyValue<>(
37                                         PETER,
38                                         TestInputWord.of(PETER, WORD_HALLO)),
39                         new KeyValue<>(
40                                         KLAUS,
41                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
42                         new KeyValue<>(
43                                         PETER,
44                                         TestInputWord.of(PETER, WORD_WELT)),
45                         new KeyValue<>(
46                                         KLAUS,
47                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
48                         new KeyValue<>(
49                                         KLAUS,
50                                         TestInputWord.of(KLAUS, WORD_S)),
51                         new KeyValue<>(
52                                         PETER,
53                                         TestInputWord.of(PETER, WORD_BOÄH)),
54                         new KeyValue<>(
55                                         PETER,
56                                         TestInputWord.of(PETER, WORD_WELT)),
57                         new KeyValue<>(
58                                         PETER,
59                                         TestInputWord.of(PETER, WORD_BOÄH)),
60                         new KeyValue<>(
61                                         KLAUS,
62                                         TestInputWord.of(KLAUS, WORD_S)),
63                         new KeyValue<>(
64                                         PETER,
65                                         TestInputWord.of(PETER, WORD_BOÄH)),
66                         new KeyValue<>(
67                                         KLAUS,
68                                         TestInputWord.of(KLAUS, WORD_S)),
69         };
70
71         static Stream<KeyValue<String, TestInputWord>> getInputMessages()
72         {
73                 return Stream.of(TestData.INPUT_MESSAGES);
74         }
75
76         static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedMessagesAssertion()
77         {
78                 return receivedMessages -> assertExpectedMessages(receivedMessages);
79         }
80
81         static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
82         {
83                 expectedMessages().forEach(
84                                 (word, counter) ->
85                                                 assertThat(receivedMessages.get(word))
86                                                                 .containsExactlyElementsOf(counter));
87         }
88
89         static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedNumberOfMessagesForWordAssertion()
90         {
91                 return receivedMessages -> assertExpectedNumberOfMessagesForWord(receivedMessages);
92         }
93
94         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
95         {
96                 assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
97                 assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
98                 assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
99                 assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
100                 assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
101         }
102
103         private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
104         {
105                 return messagesForUsers.get(word).size();
106         }
107
108         static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedLastMessagesForWordAssertion()
109         {
110                 return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages);
111         }
112
113         static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
114         {
115                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
116                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
117                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
118                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
119                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
120         }
121
122         private static Word wordOf(TestOutputWord testOutputWord)
123         {
124                 Word word = new Word();
125
126                 word.setUser(testOutputWord.getUser());
127                 word.setWord(testOutputWord.getWord());
128
129                 return word;
130         }
131
132         static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
133         {
134                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
135                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
136                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
137                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
138                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
139         }
140
141         private static void assertWordCountEqualsWordCountFromLastMessage(
142                         TestOutputWord word,
143                         Long counter)
144         {
145                 TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
146                                 word.getUser(),
147                                 word.getWord(),
148                                 counter);
149                 assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
150         }
151
152         private static void assertWordCountEqualsWordCountFromLastMessage(
153                         TestOutputWord word,
154                         TestOutputWordCounter counter)
155         {
156                 assertThat(counter).isEqualTo(getLastMessageFor(word));
157         }
158
159         private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
160         {
161                 return getLastMessageFor(word, expectedMessages());
162         }
163
164         private static TestOutputWordCounter getLastMessageFor(
165                         TestOutputWord user,
166                         MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
167         {
168                 return messagesForWord
169                                 .get(user)
170                                 .stream()
171                                 .reduce(null, (left, right) -> right);
172         }
173
174         private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
175         {
176                         KeyValue.pair(
177                                         PETER_HALLO,
178                                         TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
179                         KeyValue.pair(
180                                         KLAUS_MÜSCH,
181                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
182                         KeyValue.pair(
183                                         PETER_WELT,
184                                         TestOutputWordCounter.of(PETER, WORD_WELT,1)),
185                         KeyValue.pair(
186                                         KLAUS_MÜSCH,
187                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
188                         KeyValue.pair(
189                                         KLAUS_S,
190                                         TestOutputWordCounter.of(KLAUS, WORD_S,1)),
191                         KeyValue.pair(
192                                         PETER_BOÄH,
193                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
194                         KeyValue.pair(
195                                         PETER_WELT,
196                                         TestOutputWordCounter.of(PETER, WORD_WELT,2)),
197                         KeyValue.pair(
198                                         PETER_BOÄH,
199                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
200                         KeyValue.pair(
201                                         KLAUS_S,
202                                         TestOutputWordCounter.of(KLAUS, WORD_S,2)),
203                         KeyValue.pair(
204                                         PETER_BOÄH,
205                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
206                         KeyValue.pair(
207                                         KLAUS_S,
208                                         TestOutputWordCounter.of(KLAUS, WORD_S,3)),
209         };
210
211         static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
212         {
213                 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
214                 Stream
215                                 .of(EXPECTED_MESSAGES)
216                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
217                 return expectedMessages;
218         }
219 }