popular: 1.0.0 - Word are counted for hopping time-windows
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / TestData.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
12
13 import java.time.Clock;
14 import java.time.Instant;
15 import java.time.ZonedDateTime;
16 import java.util.function.BiConsumer;
17 import java.util.stream.Stream;
18
19 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
20 import static org.assertj.core.api.Assertions.assertThat;
21
22
23 @Slf4j
24 class TestData
25 {
26         static final Clock CLOCK = Clock.fixed(
27                         Clock.systemDefaultZone().instant(),
28                         Clock.systemDefaultZone().getZone());
29         static final String PETER = "peter";
30         static final String KLAUS = "klaus";
31
32         static final String WORD_HALLO = "Hallo";
33         static final String WORD_MÜSCH = "Müsch";
34         static final String WORD_WELT = "Welt";
35         static final String WORD_S = "s";
36         static final String WORD_BOÄH = "Boäh";
37
38         static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
39         static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
40         static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
41         static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
42         static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
43
44         private static Instant windowStart()
45         {
46                 return windowBoundFor(CLOCK.instant(), 0);
47         }
48
49         private static Instant windowEnd()
50         {
51                 return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
52         }
53
54         private static Instant windowBoundFor(Instant instant, int second)
55         {
56                 return instantOfSecond(second, 0);
57         }
58
59         private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
60         {
61                         TestMessage.of(
62                                         instantOfSecond(0),
63                                         InputUser.of(PETER),
64                                         InputWord.of(PETER, WORD_HALLO)),
65                         TestMessage.of(
66                                         instantOfSecond(13),
67                                         InputUser.of(KLAUS),
68                                         InputWord.of(KLAUS, WORD_MÜSCH)),
69                         TestMessage.of(
70                                         instantOfSecond(0),
71                                         InputUser.of(PETER),
72                                         InputWord.of(PETER, WORD_WELT)),
73                         TestMessage.of(
74                                         instantOfSecond(14),
75                                         InputUser.of(KLAUS),
76                                         InputWord.of(KLAUS, WORD_MÜSCH)),
77                         TestMessage.of(
78                                         instantOfSecond(14),
79                                         InputUser.of(KLAUS),
80                                         InputWord.of(KLAUS, WORD_S)),
81                         TestMessage.of(
82                                         instantOfSecond(14),
83                                         InputUser.of(PETER),
84                                         InputWord.of(PETER, WORD_BOÄH)),
85                         TestMessage.of(
86                                         instantOfSecond(14),
87                                         InputUser.of(PETER),
88                                         InputWord.of(PETER, WORD_WELT)),
89                         TestMessage.of(
90                                         instantOfSecond(15),
91                                         InputUser.of(PETER),
92                                         InputWord.of(PETER, WORD_BOÄH)),
93                         TestMessage.of(
94                                         instantOfSecond(15),
95                                         InputUser.of(KLAUS),
96                                         InputWord.of(KLAUS, WORD_S)),
97                         TestMessage.of(
98                                         instantOfSecond(29),
99                                         InputUser.of(PETER),
100                                         InputWord.of(PETER, WORD_BOÄH)),
101                         TestMessage.of(
102                                         instantOfSecond(20),
103                                         InputUser.of(KLAUS),
104                                         InputWord.of(KLAUS, WORD_S)),
105         };
106
107         private static Instant instantOfSecond(int second)
108         {
109                 return instantOfSecond(second, 0);
110         }
111
112         private static Instant instantOfSecond(int second, int naonSeconds)
113         {
114                 return ZonedDateTime
115                                 .ofInstant(CLOCK.instant(), CLOCK.getZone())
116                                 .withSecond(second)
117                                 .withNano(naonSeconds)
118                                 .toInstant();
119         }
120
121         private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
122         {
123                 return Stream.of(TestData.INPUT_MESSAGES);
124         }
125
126         static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
127         {
128                 getInputMessages().forEach(message ->
129                 {
130                         log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
131                         consumer.accept(message.time, new KeyValue<>(message.key, message.value));
132                 });
133         }
134
135         static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
136         {
137                 expectedMessages().forEach(
138                                 (word, counter) ->
139                                                 assertThat(receivedMessages.get(word))
140                                                                 .containsExactlyElementsOf(counter));
141         }
142
143         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
144         {
145                 assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
146                 assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
147                 assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
148                 assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
149                 assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
150         }
151
152         private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
153         {
154                 return messagesForUsers.get(word) == null
155                                 ? 0
156                                 : messagesForUsers.get(word).size();
157         }
158
159         static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
160         {
161                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
162                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
163                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
164                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
165                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
166         }
167
168         private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
169         {
170                 WindowedWord windowedWord = new WindowedWord();
171
172                 windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
173                 windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
174                 windowedWord.setWord(outputWindowedWord.getWord());
175
176                 return windowedWord;
177         }
178
179         static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
180         {
181                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
182                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
183                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
184                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
185                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
186         }
187
188         private static void assertWordCountEqualsWordCountFromLastMessage(
189                         OutputWindowedWord word,
190                         WordCounter counter)
191         {
192                 OutputWordCounter outputWordCounter = OutputWordCounter.of(
193                                 word.getWord(),
194                                 counter.getCounter());
195                 assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
196         }
197
198         private static void assertWordCountEqualsWordCountFromLastMessage(
199                         OutputWindowedWord word,
200                         OutputWordCounter counter)
201         {
202                 assertThat(counter).isEqualTo(getLastMessageFor(word));
203         }
204
205         private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
206         {
207                 return getLastMessageFor(word, expectedMessages());
208         }
209
210         private static OutputWordCounter getLastMessageFor(
211                         OutputWindowedWord user,
212                         MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
213         {
214                 return messagesForWord
215                                 .get(user)
216                                 .stream()
217                                 .reduce(null, (left, right) -> right);
218         }
219
220         private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
221         {
222                         KeyValue.pair(
223                                         WINDOWED_WORD_HALLO,
224                                         OutputWordCounter.of(WORD_HALLO,1)),
225                         KeyValue.pair(
226                                         WINDOWED_WORD_MÜSCH,
227                                         OutputWordCounter.of(WORD_MÜSCH,1)),
228                         KeyValue.pair(
229                                         WINDOWED_WORD_WELT,
230                                         OutputWordCounter.of(WORD_WELT,1)),
231                         KeyValue.pair(
232                                         WINDOWED_WORD_MÜSCH,
233                                         OutputWordCounter.of(WORD_MÜSCH,2)),
234                         KeyValue.pair(
235                                         WINDOWED_WORD_S,
236                                         OutputWordCounter.of(WORD_S,1)),
237                         KeyValue.pair(
238                                         WINDOWED_WORD_BOÄH,
239                                         OutputWordCounter.of(WORD_BOÄH,1)),
240                         KeyValue.pair(
241                                         WINDOWED_WORD_WELT,
242                                         OutputWordCounter.of(WORD_WELT,2)),
243                         KeyValue.pair(
244                                         WINDOWED_WORD_BOÄH,
245                                         OutputWordCounter.of(WORD_BOÄH,2)),
246                         KeyValue.pair(
247                                         WINDOWED_WORD_S,
248                                         OutputWordCounter.of(WORD_S,2)),
249                         KeyValue.pair(
250                                         WINDOWED_WORD_BOÄH,
251                                         OutputWordCounter.of(WORD_BOÄH,3)),
252                         KeyValue.pair(
253                                         WINDOWED_WORD_S,
254                                         OutputWordCounter.of(WORD_S,3)),
255         };
256
257         static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
258         {
259                 MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
260                 Stream
261                                 .of(EXPECTED_MESSAGES)
262                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
263                 return expectedMessages;
264         }
265 }