counter: 1.2.15 - Inlined the asserting ``Consumer``s
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import org.apache.kafka.streams.KeyValue;
7 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
8 import org.springframework.util.LinkedMultiValueMap;
9 import org.springframework.util.MultiValueMap;
10
11 import java.util.stream.Stream;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static final String PETER = "peter";
19         static final String KLAUS = "klaus";
20
21         static final String WORD_HALLO = "Hallo";
22         static final String WORD_MÜSCH = "Müsch";
23         static final String WORD_WELT = "Welt";
24         static final String WORD_S = "s";
25         static final String WORD_BOÄH = "Boäh";
26
27         static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
28         static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
29         static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
30         static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
31         static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
32
33         private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
34         {
35                         new KeyValue<>(
36                                         PETER,
37                                         TestInputWord.of(PETER, WORD_HALLO)),
38                         new KeyValue<>(
39                                         KLAUS,
40                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
41                         new KeyValue<>(
42                                         PETER,
43                                         TestInputWord.of(PETER, WORD_WELT)),
44                         new KeyValue<>(
45                                         KLAUS,
46                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
47                         new KeyValue<>(
48                                         KLAUS,
49                                         TestInputWord.of(KLAUS, WORD_S)),
50                         new KeyValue<>(
51                                         PETER,
52                                         TestInputWord.of(PETER, WORD_BOÄH)),
53                         new KeyValue<>(
54                                         PETER,
55                                         TestInputWord.of(PETER, WORD_WELT)),
56                         new KeyValue<>(
57                                         PETER,
58                                         TestInputWord.of(PETER, WORD_BOÄH)),
59                         new KeyValue<>(
60                                         KLAUS,
61                                         TestInputWord.of(KLAUS, WORD_S)),
62                         new KeyValue<>(
63                                         PETER,
64                                         TestInputWord.of(PETER, WORD_BOÄH)),
65                         new KeyValue<>(
66                                         KLAUS,
67                                         TestInputWord.of(KLAUS, WORD_S)),
68         };
69
70         static Stream<KeyValue<String, TestInputWord>> getInputMessages()
71         {
72                 return Stream.of(TestData.INPUT_MESSAGES);
73         }
74
75         static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
76         {
77                 expectedMessages().forEach(
78                                 (word, counter) ->
79                                                 assertThat(receivedMessages.get(word))
80                                                                 .containsExactlyElementsOf(counter));
81         }
82
83         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
84         {
85                 assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
86                 assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
87                 assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
88                 assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
89                 assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
90         }
91
92         private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
93         {
94                 return messagesForUsers.get(word).size();
95         }
96
97         static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
98         {
99                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
100                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
101                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
102                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
103                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
104         }
105
106         private static Word wordOf(TestOutputWord testOutputWord)
107         {
108                 Word word = new Word();
109
110                 word.setUser(testOutputWord.getUser());
111                 word.setWord(testOutputWord.getWord());
112
113                 return word;
114         }
115
116         static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
117         {
118                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
119                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
120                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
121                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
122                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
123         }
124
125         private static void assertWordCountEqualsWordCountFromLastMessage(
126                         TestOutputWord word,
127                         Long counter)
128         {
129                 TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
130                                 word.getUser(),
131                                 word.getWord(),
132                                 counter);
133                 assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
134         }
135
136         private static void assertWordCountEqualsWordCountFromLastMessage(
137                         TestOutputWord word,
138                         TestOutputWordCounter counter)
139         {
140                 assertThat(counter).isEqualTo(getLastMessageFor(word));
141         }
142
143         private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
144         {
145                 return getLastMessageFor(word, expectedMessages());
146         }
147
148         private static TestOutputWordCounter getLastMessageFor(
149                         TestOutputWord user,
150                         MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
151         {
152                 return messagesForWord
153                                 .get(user)
154                                 .stream()
155                                 .reduce(null, (left, right) -> right);
156         }
157
158         private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
159         {
160                         KeyValue.pair(
161                                         PETER_HALLO,
162                                         TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
163                         KeyValue.pair(
164                                         KLAUS_MÜSCH,
165                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
166                         KeyValue.pair(
167                                         PETER_WELT,
168                                         TestOutputWordCounter.of(PETER, WORD_WELT,1)),
169                         KeyValue.pair(
170                                         KLAUS_MÜSCH,
171                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
172                         KeyValue.pair(
173                                         KLAUS_S,
174                                         TestOutputWordCounter.of(KLAUS, WORD_S,1)),
175                         KeyValue.pair(
176                                         PETER_BOÄH,
177                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
178                         KeyValue.pair(
179                                         PETER_WELT,
180                                         TestOutputWordCounter.of(PETER, WORD_WELT,2)),
181                         KeyValue.pair(
182                                         PETER_BOÄH,
183                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
184                         KeyValue.pair(
185                                         KLAUS_S,
186                                         TestOutputWordCounter.of(KLAUS, WORD_S,2)),
187                         KeyValue.pair(
188                                         PETER_BOÄH,
189                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
190                         KeyValue.pair(
191                                         KLAUS_S,
192                                         TestOutputWordCounter.of(KLAUS, WORD_S,3)),
193         };
194
195         static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
196         {
197                 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
198                 Stream
199                                 .of(EXPECTED_MESSAGES)
200                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
201                 return expectedMessages;
202         }
203 }