popular: 1.0.0 - Word are counted for hopping time-windows
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / TestData.java
index 892ea81..2634f5e 100644 (file)
@@ -4,18 +4,28 @@ import de.juplo.kafka.wordcount.splitter.InputUser;
 import de.juplo.kafka.wordcount.splitter.InputWord;
 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
+@Slf4j
 class TestData
 {
+       static final Clock CLOCK = Clock.fixed(
+                       Clock.systemDefaultZone().instant(),
+                       Clock.systemDefaultZone().getZone());
        static final String PETER = "peter";
        static final String KLAUS = "klaus";
 
@@ -25,54 +35,103 @@ class TestData
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO);
-       static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT);
-       static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH);
-       static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH);
-       static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S);
+       static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
+       static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
+       static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
+       static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
+       static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
 
-       private static final KeyValue<InputUser, InputWord>[] INPUT_MESSAGES = new KeyValue[]
+       private static Instant windowStart()
        {
-                       KeyValue.pair(
+               return windowBoundFor(CLOCK.instant(), 0);
+       }
+
+       private static Instant windowEnd()
+       {
+               return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
+       }
+
+       private static Instant windowBoundFor(Instant instant, int second)
+       {
+               return instantOfSecond(second, 0);
+       }
+
+       private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
+       {
+                       TestMessage.of(
+                                       instantOfSecond(0),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_HALLO)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(13),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(0),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(15),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(15),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(29),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(20),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
        };
 
-       static Stream<KeyValue<InputUser, InputWord>> getInputMessages()
+       private static Instant instantOfSecond(int second)
+       {
+               return instantOfSecond(second, 0);
+       }
+
+       private static Instant instantOfSecond(int second, int naonSeconds)
+       {
+               return ZonedDateTime
+                               .ofInstant(CLOCK.instant(), CLOCK.getZone())
+                               .withSecond(second)
+                               .withNano(naonSeconds)
+                               .toInstant();
+       }
+
+       private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
        {
                return Stream.of(TestData.INPUT_MESSAGES);
        }
 
+       static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
+       {
+               getInputMessages().forEach(message ->
+               {
+                       log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
+                       consumer.accept(message.time, new KeyValue<>(message.key, message.value));
+               });
+       }
+
        static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
                expectedMessages().forEach(
@@ -83,11 +142,11 @@ class TestData
 
        static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
-               assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
-               assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
-               assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
        }
 
        private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
@@ -97,43 +156,43 @@ class TestData
                                : messagesForUsers.get(word).size();
        }
 
-       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+       static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
        {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
        }
 
-       private static Word wordOf(OutputWindowedWord testOutputWindowedWord)
+       private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
        {
-               Word word = new Word();
+               WindowedWord windowedWord = new WindowedWord();
 
-               word.setUser(testOutputWindowedWord.getUser());
-               word.setWord(testOutputWindowedWord.getWord());
+               windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
+               windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
+               windowedWord.setWord(outputWindowedWord.getWord());
 
-               return word;
+               return windowedWord;
        }
 
        static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
                        OutputWindowedWord word,
-                       Long counter)
+                       WordCounter counter)
        {
-               OutputWordCounter testOutputWordCounter = OutputWordCounter.of(
-                               word.getUser(),
+               OutputWordCounter outputWordCounter = OutputWordCounter.of(
                                word.getWord(),
-                               counter);
-               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+                               counter.getCounter());
+               assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
@@ -161,38 +220,38 @@ class TestData
        private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       PETER_HALLO,
-                                       OutputWordCounter.of(PETER, WORD_HALLO,1)),
+                                       WINDOWED_WORD_HALLO,
+                                       OutputWordCounter.of(WORD_HALLO,1)),
                        KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+                                       WINDOWED_WORD_MÜSCH,
+                                       OutputWordCounter.of(WORD_MÜSCH,1)),
                        KeyValue.pair(
-                                       PETER_WELT,
-                                       OutputWordCounter.of(PETER, WORD_WELT,1)),
+                                       WINDOWED_WORD_WELT,
+                                       OutputWordCounter.of(WORD_WELT,1)),
                        KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+                                       WINDOWED_WORD_MÜSCH,
+                                       OutputWordCounter.of(WORD_MÜSCH,2)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,1)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,1)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,1)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,1)),
                        KeyValue.pair(
-                                       PETER_WELT,
-                                       OutputWordCounter.of(PETER, WORD_WELT,2)),
+                                       WINDOWED_WORD_WELT,
+                                       OutputWordCounter.of(WORD_WELT,2)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,2)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,2)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,2)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,2)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,3)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,3)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,3)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,3)),
        };
 
        static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()