1 package de.juplo.kafka.wordcount.popular;
3 import de.juplo.kafka.wordcount.splitter.InputUser;
4 import de.juplo.kafka.wordcount.splitter.InputWord;
5 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
6 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.time.Clock;
14 import java.time.Instant;
15 import java.time.ZonedDateTime;
16 import java.util.function.BiConsumer;
17 import java.util.stream.Stream;
19 import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
20 import static org.assertj.core.api.Assertions.assertThat;
26 static final Clock CLOCK = Clock.fixed(
27 Clock.systemDefaultZone().instant(),
28 Clock.systemDefaultZone().getZone());
29 static final String PETER = "peter";
30 static final String KLAUS = "klaus";
32 static final String WORD_HALLO = "Hallo";
33 static final String WORD_MÜSCH = "Müsch";
34 static final String WORD_WELT = "Welt";
35 static final String WORD_S = "s";
36 static final String WORD_BOÄH = "Boäh";
38 static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), WORD_HALLO);
39 static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), WORD_WELT);
40 static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), WORD_BOÄH);
41 static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), WORD_MÜSCH);
42 static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), WORD_S);
44 private static String windowStart()
46 return toEpochSecond(windowBoundFor(0));
49 private static String toEpochSecond(Instant instant)
51 return Long.toString(instant.getEpochSecond());
54 private static Instant windowBoundFor(int second)
56 return instantOfSecond(second, 0);
59 private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
64 InputWord.of(PETER, WORD_HALLO)),
68 InputWord.of(KLAUS, WORD_MÜSCH)),
72 InputWord.of(PETER, WORD_WELT)),
76 InputWord.of(KLAUS, WORD_MÜSCH)),
80 InputWord.of(KLAUS, WORD_S)),
84 InputWord.of(PETER, WORD_BOÄH)),
88 InputWord.of(PETER, WORD_WELT)),
92 InputWord.of(PETER, WORD_BOÄH)),
96 InputWord.of(KLAUS, WORD_S)),
100 InputWord.of(PETER, WORD_BOÄH)),
104 InputWord.of(KLAUS, WORD_S)),
106 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
108 InputWord.of(PETER, WORD_HALLO)),
110 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
112 InputWord.of(KLAUS, WORD_MÜSCH)),
114 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
116 InputWord.of(PETER, WORD_WELT)),
118 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
120 InputWord.of(KLAUS, WORD_S)),
122 instantOfSecond((int)WINDOW_SIZE.toSeconds()),
124 InputWord.of(PETER, WORD_BOÄH)),
127 private static Instant instantOfSecond(int second)
129 return instantOfSecond(second, 0);
132 private static Instant instantOfSecond(int second, int naonSeconds)
135 .ofInstant(CLOCK.instant(), CLOCK.getZone())
138 .withNano(naonSeconds)
142 private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
144 return Stream.of(TestData.INPUT_MESSAGES);
147 static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
149 getInputMessages().forEach(message ->
151 log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
152 consumer.accept(message.time, new KeyValue<>(message.key, message.value));
156 static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
158 expectedMessages().forEach(
160 assertThat(receivedMessages.get(word))
161 .containsExactlyElementsOf(counter));
164 static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
166 assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
167 assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
168 assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
169 assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
170 assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
173 private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
175 return messagesForUsers.get(word) == null
177 : messagesForUsers.get(word).size();
180 static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
182 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
183 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
184 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
185 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
186 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
189 private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
191 WindowedWord windowedWord = new WindowedWord();
193 windowedWord.setTime(outputWindowedWord.getTime());
194 windowedWord.setKey(outputWindowedWord.getKey());
199 static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
201 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
202 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
203 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
204 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
205 assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
208 private static void assertWordCountEqualsWordCountFromLastMessage(
209 OutputWindowedWord word,
212 OutputWordCounter outputWordCounter = OutputWordCounter.of(
214 counter.getCounter());
215 assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
218 private static void assertWordCountEqualsWordCountFromLastMessage(
219 OutputWindowedWord word,
220 OutputWordCounter counter)
222 assertThat(counter).isEqualTo(getLastMessageFor(word));
225 private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
227 return getLastMessageFor(word, expectedMessages());
230 private static OutputWordCounter getLastMessageFor(
231 OutputWindowedWord user,
232 MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
234 return messagesForWord
237 .reduce(null, (left, right) -> right);
240 private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
244 OutputWordCounter.of(WORD_HALLO,1)),
247 OutputWordCounter.of(WORD_MÜSCH,2)),
250 OutputWordCounter.of(WORD_WELT,2)),
253 OutputWordCounter.of(WORD_BOÄH,3)),
256 OutputWordCounter.of(WORD_S,3)),
259 static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
261 MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
263 .of(EXPECTED_MESSAGES)
264 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
265 return expectedMessages;