X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fpopular%2FTestData.java;h=2634f5e41fb35278e78f82330fbb3a77f1b97d12;hb=071bfe7637dd02088b0c6b22ad561b8b50677cf6;hp=892ea8150d6b8dc898272840966f5efaf8f024a7;hpb=a7d74628ddfe72ef2e03e9ae07244ad6e25d91b8;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java index 892ea81..2634f5e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -4,18 +4,28 @@ import de.juplo.kafka.wordcount.splitter.InputUser; import de.juplo.kafka.wordcount.splitter.InputWord; import de.juplo.kafka.wordcount.stats.OutputWindowedWord; import de.juplo.kafka.wordcount.stats.OutputWordCounter; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.time.Clock; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.util.function.BiConsumer; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE; import static org.assertj.core.api.Assertions.assertThat; +@Slf4j class TestData { + static final Clock CLOCK = Clock.fixed( + Clock.systemDefaultZone().instant(), + Clock.systemDefaultZone().getZone()); static final String PETER = "peter"; static final String KLAUS = "klaus"; @@ -25,54 +35,103 @@ class TestData static final String WORD_S = "s"; static final String WORD_BOÄH = "Boäh"; - static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO); - static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT); - static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH); - static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH); - static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S); + static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO); + static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT); + static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH); + static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH); + static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S); - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static Instant windowStart() { - KeyValue.pair( + return windowBoundFor(CLOCK.instant(), 0); + } + + private static Instant windowEnd() + { + return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart()); + } + + private static Instant windowBoundFor(Instant instant, int second) + { + return instantOfSecond(second, 0); + } + + private static final TestMessage[] INPUT_MESSAGES = new TestMessage[] + { + TestMessage.of( + instantOfSecond(0), InputUser.of(PETER), InputWord.of(PETER, WORD_HALLO)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(13), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(0), InputUser.of(PETER), InputWord.of(PETER, WORD_WELT)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_S)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(PETER), InputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(14), InputUser.of(PETER), InputWord.of(PETER, WORD_WELT)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(15), InputUser.of(PETER), InputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(15), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_S)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(29), InputUser.of(PETER), InputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( + TestMessage.of( + instantOfSecond(20), InputUser.of(KLAUS), InputWord.of(KLAUS, WORD_S)), }; - static Stream> getInputMessages() + private static Instant instantOfSecond(int second) + { + return instantOfSecond(second, 0); + } + + private static Instant instantOfSecond(int second, int naonSeconds) + { + return ZonedDateTime + .ofInstant(CLOCK.instant(), CLOCK.getZone()) + .withSecond(second) + .withNano(naonSeconds) + .toInstant(); + } + + private static Stream> getInputMessages() { return Stream.of(TestData.INPUT_MESSAGES); } + static void sendInputMessages(BiConsumer> consumer) + { + getInputMessages().forEach(message -> + { + log.info("Sending@{}: {} -> {}", message.time, message.key, message.value); + consumer.accept(message.time, new KeyValue<>(message.key, message.value)); + }); + } + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( @@ -83,11 +142,11 @@ class TestData static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) { - assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); - assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); - assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages)); + assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages)); } private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap messagesForUsers) @@ -97,43 +156,43 @@ class TestData : messagesForUsers.get(word).size(); } - static void assertExpectedState(ReadOnlyKeyValueStore store) + static void assertExpectedState(ReadOnlyKeyValueStore store) { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S))); } - private static Word wordOf(OutputWindowedWord testOutputWindowedWord) + private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord) { - Word word = new Word(); + WindowedWord windowedWord = new WindowedWord(); - word.setUser(testOutputWindowedWord.getUser()); - word.setWord(testOutputWindowedWord.getWord()); + windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone())); + windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone())); + windowedWord.setWord(outputWindowedWord.getWord()); - return word; + return windowedWord; } static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages)); } private static void assertWordCountEqualsWordCountFromLastMessage( OutputWindowedWord word, - Long counter) + WordCounter counter) { - OutputWordCounter testOutputWordCounter = OutputWordCounter.of( - word.getUser(), + OutputWordCounter outputWordCounter = OutputWordCounter.of( word.getWord(), - counter); - assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); + counter.getCounter()); + assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter); } private static void assertWordCountEqualsWordCountFromLastMessage( @@ -161,38 +220,38 @@ class TestData private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER_HALLO, - OutputWordCounter.of(PETER, WORD_HALLO,1)), + WINDOWED_WORD_HALLO, + OutputWordCounter.of(WORD_HALLO,1)), KeyValue.pair( - KLAUS_MÜSCH, - OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), + WINDOWED_WORD_MÜSCH, + OutputWordCounter.of(WORD_MÜSCH,1)), KeyValue.pair( - PETER_WELT, - OutputWordCounter.of(PETER, WORD_WELT,1)), + WINDOWED_WORD_WELT, + OutputWordCounter.of(WORD_WELT,1)), KeyValue.pair( - KLAUS_MÜSCH, - OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), + WINDOWED_WORD_MÜSCH, + OutputWordCounter.of(WORD_MÜSCH,2)), KeyValue.pair( - KLAUS_S, - OutputWordCounter.of(KLAUS, WORD_S,1)), + WINDOWED_WORD_S, + OutputWordCounter.of(WORD_S,1)), KeyValue.pair( - PETER_BOÄH, - OutputWordCounter.of(PETER, WORD_BOÄH,1)), + WINDOWED_WORD_BOÄH, + OutputWordCounter.of(WORD_BOÄH,1)), KeyValue.pair( - PETER_WELT, - OutputWordCounter.of(PETER, WORD_WELT,2)), + WINDOWED_WORD_WELT, + OutputWordCounter.of(WORD_WELT,2)), KeyValue.pair( - PETER_BOÄH, - OutputWordCounter.of(PETER, WORD_BOÄH,2)), + WINDOWED_WORD_BOÄH, + OutputWordCounter.of(WORD_BOÄH,2)), KeyValue.pair( - KLAUS_S, - OutputWordCounter.of(KLAUS, WORD_S,2)), + WINDOWED_WORD_S, + OutputWordCounter.of(WORD_S,2)), KeyValue.pair( - PETER_BOÄH, - OutputWordCounter.of(PETER, WORD_BOÄH,3)), + WINDOWED_WORD_BOÄH, + OutputWordCounter.of(WORD_BOÄH,3)), KeyValue.pair( - KLAUS_S, - OutputWordCounter.of(KLAUS, WORD_S,3)), + WINDOWED_WORD_S, + OutputWordCounter.of(WORD_S,3)), }; static MultiValueMap expectedMessages()