1 package de.juplo.kafka.wordcount.popular;
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.time.Clock;
14 import java.time.Instant;
15 import java.time.ZonedDateTime;
16 import java.util.function.BiConsumer;
17 import java.util.stream.Stream;
19 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
20 import static org.assertj.core.api.Assertions.assertThat;
26 static final Clock CLOCK = Clock.fixed(
27 Clock.systemDefaultZone().instant(),
28 Clock.systemDefaultZone().getZone());
29 static final String PETER = "peter";
30 static final String KLAUS = "klaus";
32 static final String WORD_HALLO = "Hallo";
33 static final String WORD_MÜSCH = "Müsch";
34 static final String WORD_WELT = "Welt";
35 static final String WORD_S = "s";
36 static final String WORD_BOÄH = "Boäh";
38 static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
39 static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
40 static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
41 static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
42 static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
44 private static Instant windowStart()
46 return windowBoundFor(CLOCK.instant(), 0);
49 private static Instant windowEnd()
51 return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
54 private static Instant windowBoundFor(Instant instant, int second)
56 return instantOfSecond(second, 0);
59 private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
64 InputWord.of(PETER, WORD_HALLO)),
68 InputWord.of(KLAUS, WORD_MÜSCH)),
72 InputWord.of(PETER, WORD_WELT)),
76 InputWord.of(KLAUS, WORD_MÜSCH)),
80 InputWord.of(KLAUS, WORD_S)),
84 InputWord.of(PETER, WORD_BOÄH)),
88 InputWord.of(PETER, WORD_WELT)),
92 InputWord.of(PETER, WORD_BOÄH)),
96 InputWord.of(KLAUS, WORD_S)),
100 InputWord.of(PETER, WORD_BOÄH)),
104 InputWord.of(KLAUS, WORD_S)),
107 private static Instant instantOfSecond(int second)
109 return instantOfSecond(second, 0);
112 private static Instant instantOfSecond(int second, int naonSeconds)
115 .ofInstant(CLOCK.instant(), CLOCK.getZone())
117 .withNano(naonSeconds)
121 private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
123 return Stream.of(TestData.INPUT_MESSAGES);
126 static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
128 getInputMessages().forEach(message ->
130 log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
131 consumer.accept(message.time, new KeyValue<>(message.key, message.value));
135 static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
137 expectedMessages().forEach(
139 assertThat(receivedMessages.get(word))
140 .containsExactlyElementsOf(counter));
143 static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
145 assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
146 assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
147 assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
148 assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
149 assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
152 private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
154 return messagesForUsers.get(word) == null
156 : messagesForUsers.get(word).size();
159 static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
161 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
162 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
163 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
164 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
165 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
168 private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
170 WindowedWord windowedWord = new WindowedWord();
172 windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
173 windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
174 windowedWord.setWord(outputWindowedWord.getWord());
179 static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
181 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
182 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
183 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
184 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
185 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
188 private static void assertWordCountEqualsWordCountFromLastMessage(
189 OutputWindowedWord word,
192 OutputWordCounter outputWordCounter = OutputWordCounter.of(
194 counter.getCounter());
195 assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
198 private static void assertWordCountEqualsWordCountFromLastMessage(
199 OutputWindowedWord word,
200 OutputWordCounter counter)
202 assertThat(counter).isEqualTo(getLastMessageFor(word));
205 private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
207 return getLastMessageFor(word, expectedMessages());
210 private static OutputWordCounter getLastMessageFor(
211 OutputWindowedWord user,
212 MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
214 return messagesForWord
217 .reduce(null, (left, right) -> right);
220 private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
224 OutputWordCounter.of(WORD_HALLO,1)),
227 OutputWordCounter.of(WORD_MÜSCH,1)),
230 OutputWordCounter.of(WORD_WELT,1)),
233 OutputWordCounter.of(WORD_MÜSCH,2)),
236 OutputWordCounter.of(WORD_S,1)),
239 OutputWordCounter.of(WORD_BOÄH,1)),
242 OutputWordCounter.of(WORD_WELT,2)),
245 OutputWordCounter.of(WORD_BOÄH,2)),
248 OutputWordCounter.of(WORD_S,2)),
251 OutputWordCounter.of(WORD_BOÄH,3)),
254 OutputWordCounter.of(WORD_S,3)),
257 static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
259 MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
261 .of(EXPECTED_MESSAGES)
262 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
263 return expectedMessages;