popular: 1.1.1 - Refined output JSON
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / TestData.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
12
13 import java.time.Clock;
14 import java.time.Instant;
15 import java.time.ZonedDateTime;
16 import java.util.function.BiConsumer;
17 import java.util.stream.Stream;
18
19 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
20 import static org.assertj.core.api.Assertions.assertThat;
21
22
23 @Slf4j
24 class TestData
25 {
26         static final Clock CLOCK = Clock.fixed(
27                         Clock.systemDefaultZone().instant(),
28                         Clock.systemDefaultZone().getZone());
29         static final String PETER = "peter";
30         static final String KLAUS = "klaus";
31
32         static final String WORD_HALLO = "Hallo";
33         static final String WORD_MÜSCH = "Müsch";
34         static final String WORD_WELT = "Welt";
35         static final String WORD_S = "s";
36         static final String WORD_BOÄH = "Boäh";
37
38         static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
39         static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
40         static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
41         static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
42         static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
43
44         private static Instant windowStart()
45         {
46                 return windowBoundFor(CLOCK.instant(), 0);
47         }
48
49         private static Instant windowEnd()
50         {
51                 return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
52         }
53
54         private static Instant windowBoundFor(Instant instant, int second)
55         {
56                 return instantOfSecond(second, 0);
57         }
58
59         private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
60         {
61                         TestMessage.of(
62                                         instantOfSecond(0),
63                                         InputUser.of(PETER),
64                                         InputWord.of(PETER, WORD_HALLO)),
65                         TestMessage.of(
66                                         instantOfSecond(13),
67                                         InputUser.of(KLAUS),
68                                         InputWord.of(KLAUS, WORD_MÜSCH)),
69                         TestMessage.of(
70                                         instantOfSecond(0),
71                                         InputUser.of(PETER),
72                                         InputWord.of(PETER, WORD_WELT)),
73                         TestMessage.of(
74                                         instantOfSecond(14),
75                                         InputUser.of(KLAUS),
76                                         InputWord.of(KLAUS, WORD_MÜSCH)),
77                         TestMessage.of(
78                                         instantOfSecond(14),
79                                         InputUser.of(KLAUS),
80                                         InputWord.of(KLAUS, WORD_S)),
81                         TestMessage.of(
82                                         instantOfSecond(14),
83                                         InputUser.of(PETER),
84                                         InputWord.of(PETER, WORD_BOÄH)),
85                         TestMessage.of(
86                                         instantOfSecond(14),
87                                         InputUser.of(PETER),
88                                         InputWord.of(PETER, WORD_WELT)),
89                         TestMessage.of(
90                                         instantOfSecond(15),
91                                         InputUser.of(PETER),
92                                         InputWord.of(PETER, WORD_BOÄH)),
93                         TestMessage.of(
94                                         instantOfSecond(15),
95                                         InputUser.of(KLAUS),
96                                         InputWord.of(KLAUS, WORD_S)),
97                         TestMessage.of(
98                                         instantOfSecond(29),
99                                         InputUser.of(PETER),
100                                         InputWord.of(PETER, WORD_BOÄH)),
101                         TestMessage.of(
102                                         instantOfSecond(20),
103                                         InputUser.of(KLAUS),
104                                         InputWord.of(KLAUS, WORD_S)),
105                         TestMessage.of(
106                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
107                                         InputUser.of(PETER),
108                                         InputWord.of(PETER, WORD_HALLO)),
109                         TestMessage.of(
110                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
111                                         InputUser.of(KLAUS),
112                                         InputWord.of(KLAUS, WORD_MÜSCH)),
113                         TestMessage.of(
114                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
115                                         InputUser.of(PETER),
116                                         InputWord.of(PETER, WORD_WELT)),
117                         TestMessage.of(
118                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
119                                         InputUser.of(KLAUS),
120                                         InputWord.of(KLAUS, WORD_S)),
121                         TestMessage.of(
122                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
123                                         InputUser.of(PETER),
124                                         InputWord.of(PETER, WORD_BOÄH)),
125         };
126
127         private static Instant instantOfSecond(int second)
128         {
129                 return instantOfSecond(second, 0);
130         }
131
132         private static Instant instantOfSecond(int second, int naonSeconds)
133         {
134                 return ZonedDateTime
135                                 .ofInstant(CLOCK.instant(), CLOCK.getZone())
136                                 .withSecond(0)
137                                 .plusSeconds(second)
138                                 .withNano(naonSeconds)
139                                 .toInstant();
140         }
141
142         private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
143         {
144                 return Stream.of(TestData.INPUT_MESSAGES);
145         }
146
147         static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
148         {
149                 getInputMessages().forEach(message ->
150                 {
151                         log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
152                         consumer.accept(message.time, new KeyValue<>(message.key, message.value));
153                 });
154         }
155
156         static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
157         {
158                 expectedMessages().forEach(
159                                 (word, counter) ->
160                                                 assertThat(receivedMessages.get(word))
161                                                                 .containsExactlyElementsOf(counter));
162         }
163
164         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
165         {
166                 assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
167                 assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
168                 assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
169                 assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
170                 assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
171         }
172
173         private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
174         {
175                 return messagesForUsers.get(word) == null
176                                 ? 0
177                                 : messagesForUsers.get(word).size();
178         }
179
180         static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
181         {
182                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
183                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
184                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
185                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
186                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
187         }
188
189         private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
190         {
191                 WindowedWord windowedWord = new WindowedWord();
192
193                 windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
194                 windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
195                 windowedWord.setKey(outputWindowedWord.getKey());
196
197                 return windowedWord;
198         }
199
200         static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
201         {
202                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
203                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
204                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
205                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
206                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
207         }
208
209         private static void assertWordCountEqualsWordCountFromLastMessage(
210                         OutputWindowedWord word,
211                         WordCounter counter)
212         {
213                 OutputWordCounter outputWordCounter = OutputWordCounter.of(
214                                 word.getKey(),
215                                 counter.getCounter());
216                 assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
217         }
218
219         private static void assertWordCountEqualsWordCountFromLastMessage(
220                         OutputWindowedWord word,
221                         OutputWordCounter counter)
222         {
223                 assertThat(counter).isEqualTo(getLastMessageFor(word));
224         }
225
226         private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
227         {
228                 return getLastMessageFor(word, expectedMessages());
229         }
230
231         private static OutputWordCounter getLastMessageFor(
232                         OutputWindowedWord user,
233                         MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
234         {
235                 return messagesForWord
236                                 .get(user)
237                                 .stream()
238                                 .reduce(null, (left, right) -> right);
239         }
240
241         private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
242         {
243                         KeyValue.pair(
244                                         WINDOWED_WORD_HALLO,
245                                         OutputWordCounter.of(WORD_HALLO,1)),
246                         KeyValue.pair(
247                                         WINDOWED_WORD_MÜSCH,
248                                         OutputWordCounter.of(WORD_MÜSCH,2)),
249                         KeyValue.pair(
250                                         WINDOWED_WORD_WELT,
251                                         OutputWordCounter.of(WORD_WELT,2)),
252                         KeyValue.pair(
253                                         WINDOWED_WORD_BOÄH,
254                                         OutputWordCounter.of(WORD_BOÄH,3)),
255                         KeyValue.pair(
256                                         WINDOWED_WORD_S,
257                                         OutputWordCounter.of(WORD_S,3)),
258         };
259
260         static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
261         {
262                 MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
263                 Stream
264                                 .of(EXPECTED_MESSAGES)
265                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
266                 return expectedMessages;
267         }
268 }