X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTestData.java;h=e0c53df8fa6caade6f8df78c5deb868fcbc74a65;hb=6500abf8a211334e869de7cc2f354368f0263bc1;hp=f6d7ccdfdb4bad1d72454b37e1447770e754fbe9;hpb=b69e1270a308f200b2640a01d37d4636a0a549e1;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index f6d7ccd..e0c53df 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -2,16 +2,11 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -19,41 +14,44 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { + static final User PETER = User.of("peter"); + static final User KLAUS = User.of("klaus"); + static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - TestWord.of("peter","Hallo"), - TestCounter.of("peter","Hallo",1)), + TestWord.of(PETER.getUser(),"Hallo"), + TestCounter.of(PETER.getUser(),"Hallo",1)), new KeyValue<>( - TestWord.of("klaus","Müsch"), - TestCounter.of("klaus","Müsch",1)), + TestWord.of(KLAUS.getUser(),"Müsch"), + TestCounter.of(KLAUS.getUser(),"Müsch",1)), new KeyValue<>( - TestWord.of("peter","Welt"), - TestCounter.of("peter","Welt",1)), + TestWord.of(PETER.getUser(),"Welt"), + TestCounter.of(PETER.getUser(),"Welt",1)), new KeyValue<>( - TestWord.of("klaus","Müsch"), - TestCounter.of("klaus","Müsch",2)), + TestWord.of(KLAUS.getUser(),"Müsch"), + TestCounter.of(KLAUS.getUser(),"Müsch",2)), new KeyValue<>( - TestWord.of("klaus","s"), - TestCounter.of("klaus","s",1)), + TestWord.of(KLAUS.getUser(),"s"), + TestCounter.of(KLAUS.getUser(),"s",1)), new KeyValue<>( - TestWord.of("peter","Boäh"), - TestCounter.of("peter","Boäh",1)), + TestWord.of(PETER.getUser(),"Boäh"), + TestCounter.of(PETER.getUser(),"Boäh",1)), new KeyValue<>( - TestWord.of("peter","Welt"), - TestCounter.of("peter","Welt",2)), + TestWord.of(PETER.getUser(),"Welt"), + TestCounter.of(PETER.getUser(),"Welt",2)), new KeyValue<>( - TestWord.of("peter","Boäh"), - TestCounter.of("peter","Boäh",2)), + TestWord.of(PETER.getUser(),"Boäh"), + TestCounter.of(PETER.getUser(),"Boäh",2)), new KeyValue<>( - TestWord.of("klaus","s"), - TestCounter.of("klaus","s",2)), + TestWord.of(KLAUS.getUser(),"s"), + TestCounter.of(KLAUS.getUser(),"s",2)), new KeyValue<>( - TestWord.of("peter","Boäh"), - TestCounter.of("peter","Boäh",3)), + TestWord.of(PETER.getUser(),"Boäh"), + TestCounter.of(PETER.getUser(),"Boäh",3)), new KeyValue<>( - TestWord.of("klaus","s"), - TestCounter.of("klaus","s",3)), + TestWord.of(KLAUS.getUser(),"s"), + TestCounter.of(KLAUS.getUser(),"s",3)), }; static void assertExpectedMessages(MultiValueMap receivedMessages) @@ -66,65 +64,101 @@ class TestData static void assertExpectedState(ReadOnlyKeyValueStore store) { - assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value); - assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value); + assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER)); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS)); + } + + static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) + { + assertThat(countMessagesForUser(PETER, receivedMessages)); + assertThat(countMessagesForUser(KLAUS, receivedMessages)); + } + + static int countMessagesForUser(User user, MultiValueMap messagesForUsers) + { + return messagesForUsers.get(user).size(); + } + + + static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) + { + assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); + assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); + } + + static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking) + { + assertThat(ranking).isEqualTo(getLastMessageFor(user)); + } + + static Ranking getLastMessageFor(User user) + { + return getLastMessageFor(user, expectedMessages()); + } + + static Ranking getLastMessageFor(User user, MultiValueMap messagesForUsers) + { + return messagesForUsers + .get(user) + .stream() + .reduce(null, (left, right) -> right); } static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 - User.of("peter"), + PETER, Ranking.of( Entry.of("Hallo", 1l))), KeyValue.pair( // 1 - User.of("klaus"), + KLAUS, Ranking.of( Entry.of("Müsch", 1l))), KeyValue.pair( // 2 - User.of("peter"), + PETER, Ranking.of( Entry.of("Hallo", 1l), Entry.of("Welt", 1l))), KeyValue.pair( // 3 - User.of("klaus"), + KLAUS, Ranking.of( Entry.of("Müsch", 2l))), KeyValue.pair( // 4 - User.of("klaus"), + KLAUS, Ranking.of( Entry.of("Müsch", 2l), Entry.of("s", 1l))), KeyValue.pair( // 5 - User.of("peter"), + PETER, Ranking.of( Entry.of("Hallo", 1l), Entry.of("Welt", 1l), Entry.of("Boäh", 1l))), KeyValue.pair( // 6 - User.of("peter"), + PETER, Ranking.of( Entry.of("Welt", 2l), Entry.of("Hallo", 1l), Entry.of("Boäh", 1l))), KeyValue.pair( // 7 - User.of("peter"), + PETER, Ranking.of( Entry.of("Welt", 2l), Entry.of("Boäh", 2l), Entry.of("Hallo", 1l))), KeyValue.pair( // 8 - User.of("klaus"), + KLAUS, Ranking.of( Entry.of("Müsch", 2l), Entry.of("s", 2l))), KeyValue.pair( // 9 - User.of("peter"), + PETER, Ranking.of( Entry.of("Boäh", 3l), Entry.of("Welt", 2l), Entry.of("Hallo", 1l))), KeyValue.pair( // 10 - User.of("klaus"), + KLAUS, Ranking.of( Entry.of("s", 3l), Entry.of("Müsch", 2l))), @@ -138,29 +172,4 @@ class TestData .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); return expectedMessages; } - - static Map convertToMap(Properties properties) - { - return properties - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> (String)entry.getKey(), - entry -> entry.getValue() - )); - } - - static String parseHeader(Headers headers, String key) - { - Header header = headers.lastHeader(key); - if (header == null) - { - return key + "=null"; - } - else - { - return key + "=" + new String(header.value()); - } - } }