counter: 1.3.0 - (RED) Introduced domain-class `User` as key
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputUser;
4 import de.juplo.kafka.wordcount.splitter.TestInputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWord;
6 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.stream.Stream;
13
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 class TestData
18 {
19         static final String PETER = "peter";
20         static final String KLAUS = "klaus";
21
22         static final String WORD_HALLO = "Hallo";
23         static final String WORD_MÜSCH = "Müsch";
24         static final String WORD_WELT = "Welt";
25         static final String WORD_S = "s";
26         static final String WORD_BOÄH = "Boäh";
27
28         static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
29         static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
30         static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
31         static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
32         static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
33
34         private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
35         {
36                         new KeyValue<>(
37                                         TestInputUser.of(PETER),
38                                         TestInputWord.of(PETER, WORD_HALLO)),
39                         new KeyValue<>(
40                                         TestInputUser.of(KLAUS),
41                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
42                         new KeyValue<>(
43                                         TestInputUser.of(PETER),
44                                         TestInputWord.of(PETER, WORD_WELT)),
45                         new KeyValue<>(
46                                         TestInputUser.of(KLAUS),
47                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
48                         new KeyValue<>(
49                                         TestInputUser.of(KLAUS),
50                                         TestInputWord.of(KLAUS, WORD_S)),
51                         new KeyValue<>(
52                                         TestInputUser.of(PETER),
53                                         TestInputWord.of(PETER, WORD_BOÄH)),
54                         new KeyValue<>(
55                                         TestInputUser.of(PETER),
56                                         TestInputWord.of(PETER, WORD_WELT)),
57                         new KeyValue<>(
58                                         TestInputUser.of(PETER),
59                                         TestInputWord.of(PETER, WORD_BOÄH)),
60                         new KeyValue<>(
61                                         TestInputUser.of(KLAUS),
62                                         TestInputWord.of(KLAUS, WORD_S)),
63                         new KeyValue<>(
64                                         TestInputUser.of(PETER),
65                                         TestInputWord.of(PETER, WORD_BOÄH)),
66                         new KeyValue<>(
67                                         TestInputUser.of(KLAUS),
68                                         TestInputWord.of(KLAUS, WORD_S)),
69         };
70
71         static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
72         {
73                 return Stream.of(TestData.INPUT_MESSAGES);
74         }
75
76         static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
77         {
78                 expectedMessages().forEach(
79                                 (word, counter) ->
80                                                 assertThat(receivedMessages.get(word))
81                                                                 .containsExactlyElementsOf(counter));
82         }
83
84         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
85         {
86                 assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
87                 assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
88                 assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
89                 assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
90                 assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
91         }
92
93         private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
94         {
95                 return messagesForUsers.get(word).size();
96         }
97
98         static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
99         {
100                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
101                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
102                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
103                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
104                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
105         }
106
107         private static Word wordOf(TestOutputWord testOutputWord)
108         {
109                 Word word = new Word();
110
111                 word.setUser(testOutputWord.getUser());
112                 word.setWord(testOutputWord.getWord());
113
114                 return word;
115         }
116
117         static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
118         {
119                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
120                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
121                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
122                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
123                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
124         }
125
126         private static void assertWordCountEqualsWordCountFromLastMessage(
127                         TestOutputWord word,
128                         Long counter)
129         {
130                 TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
131                                 word.getUser(),
132                                 word.getWord(),
133                                 counter);
134                 assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
135         }
136
137         private static void assertWordCountEqualsWordCountFromLastMessage(
138                         TestOutputWord word,
139                         TestOutputWordCounter counter)
140         {
141                 assertThat(counter).isEqualTo(getLastMessageFor(word));
142         }
143
144         private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
145         {
146                 return getLastMessageFor(word, expectedMessages());
147         }
148
149         private static TestOutputWordCounter getLastMessageFor(
150                         TestOutputWord user,
151                         MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
152         {
153                 return messagesForWord
154                                 .get(user)
155                                 .stream()
156                                 .reduce(null, (left, right) -> right);
157         }
158
159         private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
160         {
161                         KeyValue.pair(
162                                         PETER_HALLO,
163                                         TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
164                         KeyValue.pair(
165                                         KLAUS_MÜSCH,
166                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
167                         KeyValue.pair(
168                                         PETER_WELT,
169                                         TestOutputWordCounter.of(PETER, WORD_WELT,1)),
170                         KeyValue.pair(
171                                         KLAUS_MÜSCH,
172                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
173                         KeyValue.pair(
174                                         KLAUS_S,
175                                         TestOutputWordCounter.of(KLAUS, WORD_S,1)),
176                         KeyValue.pair(
177                                         PETER_BOÄH,
178                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
179                         KeyValue.pair(
180                                         PETER_WELT,
181                                         TestOutputWordCounter.of(PETER, WORD_WELT,2)),
182                         KeyValue.pair(
183                                         PETER_BOÄH,
184                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
185                         KeyValue.pair(
186                                         KLAUS_S,
187                                         TestOutputWordCounter.of(KLAUS, WORD_S,2)),
188                         KeyValue.pair(
189                                         PETER_BOÄH,
190                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
191                         KeyValue.pair(
192                                         KLAUS_S,
193                                         TestOutputWordCounter.of(KLAUS, WORD_S,3)),
194         };
195
196         static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
197         {
198                 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
199                 Stream
200                                 .of(EXPECTED_MESSAGES)
201                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
202                 return expectedMessages;
203         }
204 }