counter: 1.4.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputUser;
4 import de.juplo.kafka.wordcount.splitter.TestInputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWord;
6 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.stream.Stream;
13
14 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE;
15 import static org.assertj.core.api.Assertions.assertThat;
16
17
18 class TestData
19 {
20         static final String PETER = "peter";
21         static final String KLAUS = "klaus";
22
23         static final String WORD_HALLO = "Hallo";
24         static final String WORD_MÜSCH = "Müsch";
25         static final String WORD_WELT = "Welt";
26         static final String WORD_S = "s";
27         static final String WORD_BOÄH = "Boäh";
28
29         static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO);
30         static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT);
31         static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH);
32         static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH);
33         static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S);
34
35         private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
36         {
37                         KeyValue.pair(
38                                         TestInputUser.of(PETER),
39                                         TestInputWord.of(PETER, WORD_HALLO)),
40                         KeyValue.pair(
41                                         TestInputUser.of(KLAUS),
42                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
43                         KeyValue.pair(
44                                         TestInputUser.of(PETER),
45                                         TestInputWord.of(PETER, WORD_WELT)),
46                         KeyValue.pair(
47                                         TestInputUser.of(KLAUS),
48                                         TestInputWord.of(KLAUS, WORD_MÜSCH)),
49                         KeyValue.pair(
50                                         TestInputUser.of(KLAUS),
51                                         TestInputWord.of(KLAUS, WORD_S)),
52                         KeyValue.pair(
53                                         TestInputUser.of(PETER),
54                                         TestInputWord.of(PETER, WORD_BOÄH)),
55                         KeyValue.pair(
56                                         TestInputUser.of(PETER),
57                                         TestInputWord.of(PETER, WORD_WELT)),
58                         KeyValue.pair(
59                                         TestInputUser.of(PETER),
60                                         TestInputWord.of(PETER, WORD_BOÄH)),
61                         KeyValue.pair(
62                                         TestInputUser.of(KLAUS),
63                                         TestInputWord.of(KLAUS, WORD_S)),
64                         KeyValue.pair(
65                                         TestInputUser.of(PETER),
66                                         TestInputWord.of(PETER, WORD_BOÄH)),
67                         KeyValue.pair(
68                                         TestInputUser.of(KLAUS),
69                                         TestInputWord.of(KLAUS, WORD_S)),
70         };
71
72         static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
73         {
74                 return Stream.of(TestData.INPUT_MESSAGES);
75         }
76
77         static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
78         {
79                 expectedMessages().forEach(
80                                 (word, counter) ->
81                                                 assertThat(receivedMessages.get(word))
82                                                                 .containsExactlyElementsOf(counter));
83         }
84
85         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
86         {
87                 assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
88                 assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
89                 assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
90                 assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
91                 assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
92         }
93
94         private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
95         {
96                 return messagesForUsers.get(word) == null
97                                 ? 0
98                                 : messagesForUsers.get(word).size();
99         }
100
101         static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
102         {
103                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
104                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
105                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
106                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
107                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
108         }
109
110         private static Word wordOf(TestOutputWord testOutputWord)
111         {
112                 return Word.of(
113                                 testOutputWord.getChannel(),
114                                 testOutputWord.getKey());
115         }
116
117         static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
118         {
119                 assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
120                 assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
121                 assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
122                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
123                 assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
124         }
125
126         private static void assertWordCountEqualsWordCountFromLastMessage(
127                         TestOutputWord word,
128                         Long counter)
129         {
130                 TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
131                                 word.getChannel(),
132                                 word.getKey(),
133                                 counter);
134                 assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
135         }
136
137         private static void assertWordCountEqualsWordCountFromLastMessage(
138                         TestOutputWord word,
139                         TestOutputWordCounter counter)
140         {
141                 assertThat(counter).isEqualTo(getLastMessageFor(word));
142         }
143
144         private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
145         {
146                 return getLastMessageFor(word, expectedMessages());
147         }
148
149         private static TestOutputWordCounter getLastMessageFor(
150                         TestOutputWord user,
151                         MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
152         {
153                 return messagesForWord
154                                 .get(user)
155                                 .stream()
156                                 .reduce(null, (left, right) -> right);
157         }
158
159         private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
160         {
161                         KeyValue.pair(
162                                         PETER_HALLO,
163                                         TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
164                         KeyValue.pair(
165                                         KLAUS_MÜSCH,
166                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
167                         KeyValue.pair(
168                                         PETER_WELT,
169                                         TestOutputWordCounter.of(PETER, WORD_WELT,1)),
170                         KeyValue.pair(
171                                         KLAUS_MÜSCH,
172                                         TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
173                         KeyValue.pair(
174                                         KLAUS_S,
175                                         TestOutputWordCounter.of(KLAUS, WORD_S,1)),
176                         KeyValue.pair(
177                                         PETER_BOÄH,
178                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
179                         KeyValue.pair(
180                                         PETER_WELT,
181                                         TestOutputWordCounter.of(PETER, WORD_WELT,2)),
182                         KeyValue.pair(
183                                         PETER_BOÄH,
184                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
185                         KeyValue.pair(
186                                         KLAUS_S,
187                                         TestOutputWordCounter.of(KLAUS, WORD_S,2)),
188                         KeyValue.pair(
189                                         PETER_BOÄH,
190                                         TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
191                         KeyValue.pair(
192                                         KLAUS_S,
193                                         TestOutputWordCounter.of(KLAUS, WORD_S,3)),
194         };
195
196         static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
197         {
198                 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
199                 Stream
200                                 .of(EXPECTED_MESSAGES)
201                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
202                 return expectedMessages;
203         }
204 }