1 package de.juplo.kafka.wordcount.popular;
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.time.Clock;
14 import java.time.Instant;
15 import java.time.ZonedDateTime;
16 import java.util.function.BiConsumer;
17 import java.util.stream.Stream;
19 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
20 import static org.assertj.core.api.Assertions.assertThat;
26 static final Clock CLOCK = Clock.fixed(
27 Clock.systemDefaultZone().instant(),
28 Clock.systemDefaultZone().getZone());
29 static final String PETER = "peter";
30 static final String KLAUS = "klaus";
32 static final String WORD_HALLO = "Hallo";
33 static final String WORD_MÜSCH = "Müsch";
34 static final String WORD_WELT = "Welt";
35 static final String WORD_S = "s";
36 static final String WORD_BOÄH = "Boäh";
38 static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
39 static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
40 static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
41 static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
42 static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
44 private static Instant windowStart()
46 return windowBoundFor(0);
49 private static Instant windowEnd()
51 return windowBoundFor(WINDOW_SIZE.toSecondsPart());
54 private static Instant windowBoundFor(int second)
56 return instantOfSecond(second, 0);
59 private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
64 InputWord.of(PETER, WORD_HALLO)),
68 InputWord.of(KLAUS, WORD_MÜSCH)),
72 InputWord.of(PETER, WORD_WELT)),
76 InputWord.of(KLAUS, WORD_MÜSCH)),
80 InputWord.of(KLAUS, WORD_S)),
84 InputWord.of(PETER, WORD_BOÄH)),
88 InputWord.of(PETER, WORD_WELT)),
92 InputWord.of(PETER, WORD_BOÄH)),
96 InputWord.of(KLAUS, WORD_S)),
100 InputWord.of(PETER, WORD_BOÄH)),
104 InputWord.of(KLAUS, WORD_S)),
106 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
108 InputWord.of(PETER, WORD_HALLO)),
110 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
112 InputWord.of(KLAUS, WORD_MÜSCH)),
114 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
116 InputWord.of(PETER, WORD_WELT)),
118 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
120 InputWord.of(KLAUS, WORD_S)),
122 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
124 InputWord.of(PETER, WORD_BOÄH)),
127 private static Instant instantOfSecond(int second)
129 return instantOfSecond(second, 0);
132 private static Instant instantOfSecond(int second, int naonSeconds)
135 .ofInstant(CLOCK.instant(), CLOCK.getZone())
138 .withNano(naonSeconds)
142 private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
144 return Stream.of(TestData.INPUT_MESSAGES);
147 static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
149 getInputMessages().forEach(message ->
151 log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
152 consumer.accept(message.time, new KeyValue<>(message.key, message.value));
156 static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
158 expectedMessages().forEach(
160 assertThat(receivedMessages.get(word))
161 .containsExactlyElementsOf(counter));
164 static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
166 assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
167 assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
168 assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
169 assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
170 assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
173 private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
175 return messagesForUsers.get(word) == null
177 : messagesForUsers.get(word).size();
180 static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
182 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
183 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
184 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
185 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
186 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
189 private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
191 WindowedWord windowedWord = new WindowedWord();
193 windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
194 windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
195 windowedWord.setKey(outputWindowedWord.getKey());
200 static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
202 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
203 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
204 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
205 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
206 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
209 private static void assertWordCountEqualsWordCountFromLastMessage(
210 OutputWindowedWord word,
213 OutputWordCounter outputWordCounter = OutputWordCounter.of(
215 counter.getCounter());
216 assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
219 private static void assertWordCountEqualsWordCountFromLastMessage(
220 OutputWindowedWord word,
221 OutputWordCounter counter)
223 assertThat(counter).isEqualTo(getLastMessageFor(word));
226 private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
228 return getLastMessageFor(word, expectedMessages());
231 private static OutputWordCounter getLastMessageFor(
232 OutputWindowedWord user,
233 MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
235 return messagesForWord
238 .reduce(null, (left, right) -> right);
241 private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
245 OutputWordCounter.of(WORD_HALLO,1)),
248 OutputWordCounter.of(WORD_MÜSCH,2)),
251 OutputWordCounter.of(WORD_WELT,2)),
254 OutputWordCounter.of(WORD_BOÄH,3)),
257 OutputWordCounter.of(WORD_S,3)),
260 static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
262 MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
264 .of(EXPECTED_MESSAGES)
265 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
266 return expectedMessages;