6d315751285f3409bd6d070672d647cdf772aefb
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / TestData.java
1 package de.juplo.kafka.wordcount.popular;
2
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
12
13 import java.time.Clock;
14 import java.time.Instant;
15 import java.time.ZonedDateTime;
16 import java.util.function.BiConsumer;
17 import java.util.stream.Stream;
18
19 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
20 import static org.assertj.core.api.Assertions.assertThat;
21
22
23 @Slf4j
24 class TestData
25 {
26         static final Clock CLOCK = Clock.fixed(
27                         Clock.systemDefaultZone().instant(),
28                         Clock.systemDefaultZone().getZone());
29         static final String PETER = "peter";
30         static final String KLAUS = "klaus";
31
32         static final String WORD_HALLO = "Hallo";
33         static final String WORD_MÜSCH = "Müsch";
34         static final String WORD_WELT = "Welt";
35         static final String WORD_S = "s";
36         static final String WORD_BOÄH = "Boäh";
37
38         static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO);
39         static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT);
40         static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH);
41         static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH);
42         static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S);
43
44         private static String windowStart()
45         {
46                 return toEpochSecond(windowBoundFor(0));
47         }
48
49         private static String toEpochSecond(Instant instant)
50         {
51                 return Long.toString(instant.getEpochSecond());
52         }
53
54         private static Instant windowBoundFor(int second)
55         {
56                 return instantOfSecond(second, 0);
57         }
58
59         private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
60         {
61                         TestMessage.of(
62                                         instantOfSecond(0),
63                                         InputUser.of(PETER),
64                                         InputWord.of(PETER, WORD_HALLO)),
65                         TestMessage.of(
66                                         instantOfSecond(13),
67                                         InputUser.of(KLAUS),
68                                         InputWord.of(KLAUS, WORD_MÜSCH)),
69                         TestMessage.of(
70                                         instantOfSecond(0),
71                                         InputUser.of(PETER),
72                                         InputWord.of(PETER, WORD_WELT)),
73                         TestMessage.of(
74                                         instantOfSecond(14),
75                                         InputUser.of(KLAUS),
76                                         InputWord.of(KLAUS, WORD_MÜSCH)),
77                         TestMessage.of(
78                                         instantOfSecond(14),
79                                         InputUser.of(KLAUS),
80                                         InputWord.of(KLAUS, WORD_S)),
81                         TestMessage.of(
82                                         instantOfSecond(14),
83                                         InputUser.of(PETER),
84                                         InputWord.of(PETER, WORD_BOÄH)),
85                         TestMessage.of(
86                                         instantOfSecond(14),
87                                         InputUser.of(PETER),
88                                         InputWord.of(PETER, WORD_WELT)),
89                         TestMessage.of(
90                                         instantOfSecond(15),
91                                         InputUser.of(PETER),
92                                         InputWord.of(PETER, WORD_BOÄH)),
93                         TestMessage.of(
94                                         instantOfSecond(15),
95                                         InputUser.of(KLAUS),
96                                         InputWord.of(KLAUS, WORD_S)),
97                         TestMessage.of(
98                                         instantOfSecond(29),
99                                         InputUser.of(PETER),
100                                         InputWord.of(PETER, WORD_BOÄH)),
101                         TestMessage.of(
102                                         instantOfSecond(20),
103                                         InputUser.of(KLAUS),
104                                         InputWord.of(KLAUS, WORD_S)),
105                         TestMessage.of(
106                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
107                                         InputUser.of(PETER),
108                                         InputWord.of(PETER, WORD_HALLO)),
109                         TestMessage.of(
110                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
111                                         InputUser.of(KLAUS),
112                                         InputWord.of(KLAUS, WORD_MÜSCH)),
113                         TestMessage.of(
114                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
115                                         InputUser.of(PETER),
116                                         InputWord.of(PETER, WORD_WELT)),
117                         TestMessage.of(
118                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
119                                         InputUser.of(KLAUS),
120                                         InputWord.of(KLAUS, WORD_S)),
121                         TestMessage.of(
122                                         instantOfSecond((int)WINDOW_SIZE.toSeconds()),
123                                         InputUser.of(PETER),
124                                         InputWord.of(PETER, WORD_BOÄH)),
125         };
126
127         private static Instant instantOfSecond(int second)
128         {
129                 return instantOfSecond(second, 0);
130         }
131
132         private static Instant instantOfSecond(int second, int naonSeconds)
133         {
134                 return ZonedDateTime
135                                 .ofInstant(CLOCK.instant(), CLOCK.getZone())
136                                 .withSecond(0)
137                                 .plusSeconds(second)
138                                 .withNano(naonSeconds)
139                                 .toInstant();
140         }
141
142         private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
143         {
144                 return Stream.of(TestData.INPUT_MESSAGES);
145         }
146
147         static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
148         {
149                 getInputMessages().forEach(message ->
150                 {
151                         log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
152                         consumer.accept(message.time, new KeyValue<>(message.key, message.value));
153                 });
154         }
155
156         static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
157         {
158                 expectedMessages().forEach(
159                                 (word, counter) ->
160                                                 assertThat(receivedMessages.get(word))
161                                                                 .containsExactlyElementsOf(counter));
162         }
163
164         static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
165         {
166                 assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
167                 assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
168                 assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
169                 assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
170                 assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
171         }
172
173         private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
174         {
175                 return messagesForUsers.get(word) == null
176                                 ? 0
177                                 : messagesForUsers.get(word).size();
178         }
179
180         static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
181         {
182                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
183                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
184                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
185                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
186                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
187         }
188
189         private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
190         {
191                 WindowedWord windowedWord = new WindowedWord();
192
193                 windowedWord.setTime(outputWindowedWord.getTime());
194                 windowedWord.setKey(outputWindowedWord.getKey());
195
196                 return windowedWord;
197         }
198
199         static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
200         {
201                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
202                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
203                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
204                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
205                 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
206         }
207
208         private static void assertWordCountEqualsWordCountFromLastMessage(
209                         OutputWindowedWord word,
210                         WordCounter counter)
211         {
212                 OutputWordCounter outputWordCounter = OutputWordCounter.of(
213                                 word.getKey(),
214                                 counter.getCounter());
215                 assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
216         }
217
218         private static void assertWordCountEqualsWordCountFromLastMessage(
219                         OutputWindowedWord word,
220                         OutputWordCounter counter)
221         {
222                 assertThat(counter).isEqualTo(getLastMessageFor(word));
223         }
224
225         private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
226         {
227                 return getLastMessageFor(word, expectedMessages());
228         }
229
230         private static OutputWordCounter getLastMessageFor(
231                         OutputWindowedWord user,
232                         MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
233         {
234                 return messagesForWord
235                                 .get(user)
236                                 .stream()
237                                 .reduce(null, (left, right) -> right);
238         }
239
240         private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
241         {
242                         KeyValue.pair(
243                                         WINDOWED_WORD_HALLO,
244                                         OutputWordCounter.of(WORD_HALLO,1)),
245                         KeyValue.pair(
246                                         WINDOWED_WORD_MÜSCH,
247                                         OutputWordCounter.of(WORD_MÜSCH,2)),
248                         KeyValue.pair(
249                                         WINDOWED_WORD_WELT,
250                                         OutputWordCounter.of(WORD_WELT,2)),
251                         KeyValue.pair(
252                                         WINDOWED_WORD_BOÄH,
253                                         OutputWordCounter.of(WORD_BOÄH,3)),
254                         KeyValue.pair(
255                                         WINDOWED_WORD_S,
256                                         OutputWordCounter.of(WORD_S,3)),
257         };
258
259         static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
260         {
261                 MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
262                 Stream
263                                 .of(EXPECTED_MESSAGES)
264                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
265                 return expectedMessages;
266         }
267 }