-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
static final String WORD_S = "s";
static final String WORD_BOÄH = "Boäh";
- static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
- static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
- static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
- static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
- static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+ static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO);
+ static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT);
+ static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH);
+ static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH);
+ static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S);
- private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+ private static final KeyValue<InputUser, InputWord>[] INPUT_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_HALLO)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_HALLO)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_MÜSCH)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_WELT)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_MÜSCH)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_S)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_BOÄH)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_WELT)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_BOÄH)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_S)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_BOÄH)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_S)),
};
- static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
+ static Stream<KeyValue<InputUser, InputWord>> getInputMessages()
{
return Stream.of(TestData.INPUT_MESSAGES);
}
- static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
expectedMessages().forEach(
(word, counter) ->
.containsExactlyElementsOf(counter));
}
- static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
}
- private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
+ private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
{
return messagesForUsers.get(word) == null
? 0
assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
}
- private static Word wordOf(TestOutputWord testOutputWord)
+ private static Word wordOf(OutputWindowedWord testOutputWindowedWord)
{
Word word = new Word();
- word.setUser(testOutputWord.getUser());
- word.setWord(testOutputWord.getWord());
+ word.setUser(testOutputWindowedWord.getUser());
+ word.setWord(testOutputWindowedWord.getWord());
return word;
}
- static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
}
private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
+ OutputWindowedWord word,
Long counter)
{
- TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+ OutputWordCounter testOutputWordCounter = OutputWordCounter.of(
word.getUser(),
word.getWord(),
counter);
}
private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
- TestOutputWordCounter counter)
+ OutputWindowedWord word,
+ OutputWordCounter counter)
{
assertThat(counter).isEqualTo(getLastMessageFor(word));
}
- private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
+ private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
{
return getLastMessageFor(word, expectedMessages());
}
- private static TestOutputWordCounter getLastMessageFor(
- TestOutputWord user,
- MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
+ private static OutputWordCounter getLastMessageFor(
+ OutputWindowedWord user,
+ MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
{
return messagesForWord
.get(user)
.reduce(null, (left, right) -> right);
}
- private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+ private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair(
PETER_HALLO,
- TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
+ OutputWordCounter.of(PETER, WORD_HALLO,1)),
KeyValue.pair(
KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+ OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
KeyValue.pair(
PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,1)),
+ OutputWordCounter.of(PETER, WORD_WELT,1)),
KeyValue.pair(
KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+ OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
KeyValue.pair(
KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,1)),
+ OutputWordCounter.of(KLAUS, WORD_S,1)),
KeyValue.pair(
PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
+ OutputWordCounter.of(PETER, WORD_BOÄH,1)),
KeyValue.pair(
PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,2)),
+ OutputWordCounter.of(PETER, WORD_WELT,2)),
KeyValue.pair(
PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
+ OutputWordCounter.of(PETER, WORD_BOÄH,2)),
KeyValue.pair(
KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,2)),
+ OutputWordCounter.of(KLAUS, WORD_S,2)),
KeyValue.pair(
PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
+ OutputWordCounter.of(PETER, WORD_BOÄH,3)),
KeyValue.pair(
KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,3)),
+ OutputWordCounter.of(KLAUS, WORD_S,3)),
};
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
+ static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
{
- MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));