1 package de.juplo.kafka.wordcount.popular;
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.time.Clock;
14 import java.time.Instant;
15 import java.time.ZonedDateTime;
16 import java.util.function.BiConsumer;
17 import java.util.stream.Stream;
19 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
20 import static org.assertj.core.api.Assertions.assertThat;
26 static final Clock CLOCK = Clock.fixed(
27 Clock.systemDefaultZone().instant(),
28 Clock.systemDefaultZone().getZone());
29 static final String PETER = "peter";
30 static final String KLAUS = "klaus";
32 static final String WORD_HALLO = "Hallo";
33 static final String WORD_MÜSCH = "Müsch";
34 static final String WORD_WELT = "Welt";
35 static final String WORD_S = "s";
36 static final String WORD_BOÄH = "Boäh";
38 static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
39 static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
40 static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
41 static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
42 static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
44 private static Instant windowStart()
46 return windowBoundFor(CLOCK.instant(), 0);
49 private static Instant windowEnd()
51 return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
54 private static Instant windowBoundFor(Instant instant, int second)
56 return instantOfSecond(second, 0);
59 private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
64 InputWord.of(PETER, WORD_HALLO)),
68 InputWord.of(KLAUS, WORD_MÜSCH)),
72 InputWord.of(PETER, WORD_WELT)),
76 InputWord.of(KLAUS, WORD_MÜSCH)),
80 InputWord.of(KLAUS, WORD_S)),
84 InputWord.of(PETER, WORD_BOÄH)),
88 InputWord.of(PETER, WORD_WELT)),
92 InputWord.of(PETER, WORD_BOÄH)),
96 InputWord.of(KLAUS, WORD_S)),
100 InputWord.of(PETER, WORD_BOÄH)),
104 InputWord.of(KLAUS, WORD_S)),
107 private static Instant instantOfSecond(int second)
109 return instantOfSecond(second, 0);
112 private static Instant instantOfSecond(int second, int naonSeconds)
115 .ofInstant(CLOCK.instant(), CLOCK.getZone())
118 .withNano(naonSeconds)
122 private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
124 return Stream.of(TestData.INPUT_MESSAGES);
127 static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
129 getInputMessages().forEach(message ->
131 log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
132 consumer.accept(message.time, new KeyValue<>(message.key, message.value));
136 static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
138 expectedMessages().forEach(
140 assertThat(receivedMessages.get(word))
141 .containsExactlyElementsOf(counter));
144 static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
146 assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
147 assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
148 assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
149 assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
150 assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
153 private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
155 return messagesForUsers.get(word) == null
157 : messagesForUsers.get(word).size();
160 static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
162 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
163 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
164 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
165 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
166 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
169 private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
171 WindowedWord windowedWord = new WindowedWord();
173 windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
174 windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
175 windowedWord.setWord(outputWindowedWord.getWord());
180 static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
182 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
183 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
184 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
185 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
186 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
189 private static void assertWordCountEqualsWordCountFromLastMessage(
190 OutputWindowedWord word,
193 OutputWordCounter outputWordCounter = OutputWordCounter.of(
195 counter.getCounter());
196 assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
199 private static void assertWordCountEqualsWordCountFromLastMessage(
200 OutputWindowedWord word,
201 OutputWordCounter counter)
203 assertThat(counter).isEqualTo(getLastMessageFor(word));
206 private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
208 return getLastMessageFor(word, expectedMessages());
211 private static OutputWordCounter getLastMessageFor(
212 OutputWindowedWord user,
213 MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
215 return messagesForWord
218 .reduce(null, (left, right) -> right);
221 private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
225 OutputWordCounter.of(WORD_HALLO,1)),
228 OutputWordCounter.of(WORD_MÜSCH,1)),
231 OutputWordCounter.of(WORD_WELT,1)),
234 OutputWordCounter.of(WORD_MÜSCH,2)),
237 OutputWordCounter.of(WORD_S,1)),
240 OutputWordCounter.of(WORD_BOÄH,1)),
243 OutputWordCounter.of(WORD_WELT,2)),
246 OutputWordCounter.of(WORD_BOÄH,2)),
249 OutputWordCounter.of(WORD_S,2)),
252 OutputWordCounter.of(WORD_BOÄH,3)),
255 OutputWordCounter.of(WORD_S,3)),
258 static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
260 MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
262 .of(EXPECTED_MESSAGES)
263 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
264 return expectedMessages;