counter: 1.4.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputUser;
4 import de.juplo.kafka.wordcount.splitter.TestInputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWord;
6 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.stream.Stream;
13
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 class TestData
18 {
19         static final String PETER = "peter";
20         static final String KLAUS = "klaus";
21
22         static final String WORD_HALLO = "Hallo";
23         static final String WORD_MÜSCH = "Müsch";
24         static final String WORD_WELT = "Welt";
25         static final String WORD_S = "s";
26         static final String WORD_BOÄH = "Boäh";
27
28         static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
29         static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
30         static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
31         static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
32         static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
33
34         private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
35         {
36                         KeyValue.pair(
37                                         TestInputUser.of(PETER),
38                                         TestInputWord.of(PETER, WORD_HALLO)),
39                         KeyValue.pair(
40                                         TestInputUser.of(KLAUS),
41                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
42                         KeyValue.pair(
43                                         TestInputUser.of(PETER),
44                                         TestInputWord.of(PETER, WORD_WELT)),
45                         KeyValue.pair(
46                                         TestInputUser.of(KLAUS),
47                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
48                         KeyValue.pair(
49                                         TestInputUser.of(KLAUS),
50                                         TestInputWord.of(KLAUS, WORD_S)),
51                         KeyValue.pair(
52                                         TestInputUser.of(PETER),
53                                         TestInputWord.of(PETER, WORD_BOÄH)),
54                         KeyValue.pair(
55                                         TestInputUser.of(PETER),
56                                         TestInputWord.of(PETER, WORD_WELT)),
57                         KeyValue.pair(
58                                         TestInputUser.of(PETER),
59                                         TestInputWord.of(PETER, WORD_BOÄH)),
60                         KeyValue.pair(
61                                         TestInputUser.of(KLAUS),
62                                         TestInputWord.of(KLAUS, WORD_S)),
63                         KeyValue.pair(
64                                         TestInputUser.of(PETER),
65                                         TestInputWord.of(PETER, WORD_BOÄH)),
66                         KeyValue.pair(
67                                         TestInputUser.of(KLAUS),
68                                         TestInputWord.of(KLAUS, WORD_S)),
69         };
70
71         static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
72         {
73                 return Stream.of(TestData.INPUT_MESSAGES);
74         }
75
76         static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
77         {
78                 expectedMessages().forEach(
79                                 (word, counter) ->
80                                                 assertThat(receivedMessages.get(word))
81                                                                 .containsExactlyElementsOf(counter));
82         }
83
84         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
85         {
86                 assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
87                 assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
88                 assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
89                 assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
90                 assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
91         }
92
93         private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
94         {
95                 return messagesForUsers.get(word) == null
96                                 ? 0
97                                 : messagesForUsers.get(word).size();
98         }
99
100         static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
101         {
102                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
103                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
104                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
105                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
106                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
107         }
108
109         private static Word wordOf(TestOutputWord testOutputWord)
110         {
111                 Word word = new Word();
112
113                 word.setUser(testOutputWord.getUser());
114                 word.setWord(testOutputWord.getWord());
115
116                 return word;
117         }
118
119         static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
120         {
121                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
122                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
123                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
124                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
125                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
126         }
127
128         private static void assertWordCountEqualsWordCountFromLastMessage(
129                         TestOutputWord word,
130                         Long counter)
131         {
132                 TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
133                                 word.getUser(),
134                                 word.getWord(),
135                                 counter);
136                 assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
137         }
138
139         private static void assertWordCountEqualsWordCountFromLastMessage(
140                         TestOutputWord word,
141                         TestOutputWordCounter counter)
142         {
143                 assertThat(counter).isEqualTo(getLastMessageFor(word));
144         }
145
146         private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
147         {
148                 return getLastMessageFor(word, expectedMessages());
149         }
150
151         private static TestOutputWordCounter getLastMessageFor(
152                         TestOutputWord user,
153                         MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
154         {
155                 return messagesForWord
156                                 .get(user)
157                                 .stream()
158                                 .reduce(null, (left, right) -> right);
159         }
160
161         private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
162         {
163                         KeyValue.pair(
164                                         PETER_HALLO,
165                                         TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
166                         KeyValue.pair(
167                                         KLAUS_MÜSCH,
168                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
169                         KeyValue.pair(
170                                         PETER_WELT,
171                                         TestOutputWordCounter.of(PETER, WORD_WELT,1)),
172                         KeyValue.pair(
173                                         KLAUS_MÜSCH,
174                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
175                         KeyValue.pair(
176                                         KLAUS_S,
177                                         TestOutputWordCounter.of(KLAUS, WORD_S,1)),
178                         KeyValue.pair(
179                                         PETER_BOÄH,
180                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
181                         KeyValue.pair(
182                                         PETER_WELT,
183                                         TestOutputWordCounter.of(PETER, WORD_WELT,2)),
184                         KeyValue.pair(
185                                         PETER_BOÄH,
186                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
187                         KeyValue.pair(
188                                         KLAUS_S,
189                                         TestOutputWordCounter.of(KLAUS, WORD_S,2)),
190                         KeyValue.pair(
191                                         PETER_BOÄH,
192                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
193                         KeyValue.pair(
194                                         KLAUS_S,
195                                         TestOutputWordCounter.of(KLAUS, WORD_S,3)),
196         };
197
198         static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
199         {
200                 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
201                 Stream
202                                 .of(EXPECTED_MESSAGES)
203                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
204                 return expectedMessages;
205         }
206 }