top10: 1.2.1 - Refined `TestData` clearified concerns
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
index 73a405e..7a3a27e 100644 (file)
@@ -2,15 +2,15 @@ package de.juplo.kafka.wordcount.top10;
 
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.counter.TestWord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
+import de.juplo.kafka.wordcount.query.TestEntry;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
+import java.util.Arrays;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -18,44 +18,52 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-       static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+       static final TestUser PETER = TestUser.of("peter");
+       static final TestUser KLAUS = TestUser.of("klaus");
+
+       static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+       {
+               return Stream.of(INPUT_MESSAGES);
+       }
+
+       private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       TestWord.of("peter","Hallo"),
-                                       TestCounter.of("peter","Hallo",1)),
+                                       TestWord.of(PETER.getUser(),"Hallo"),
+                                       TestCounter.of(PETER.getUser(),"Hallo",1)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","Müsch"),
-                                       TestCounter.of("klaus","Müsch",1)),
+                                       TestWord.of(KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of(KLAUS.getUser(),"Müsch",1)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Welt"),
-                                       TestCounter.of("peter","Welt",1)),
+                                       TestWord.of(PETER.getUser(),"Welt"),
+                                       TestCounter.of(PETER.getUser(),"Welt",1)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","Müsch"),
-                                       TestCounter.of("klaus","Müsch",2)),
+                                       TestWord.of(KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of(KLAUS.getUser(),"Müsch",2)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","s"),
-                                       TestCounter.of("klaus","s",1)),
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",1)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Boäh"),
-                                       TestCounter.of("peter","Boäh",1)),
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",1)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Welt"),
-                                       TestCounter.of("peter","Welt",2)),
+                                       TestWord.of(PETER.getUser(),"Welt"),
+                                       TestCounter.of(PETER.getUser(),"Welt",2)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Boäh"),
-                                       TestCounter.of("peter","Boäh",2)),
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",2)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","s"),
-                                       TestCounter.of("klaus","s",2)),
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",2)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Boäh"),
-                                       TestCounter.of("peter","Boäh",3)),
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",3)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","s"),
-                                       TestCounter.of("klaus","s",3)),
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",3)),
        };
 
-       static void assertExpectedMessages(MultiValueMap<String, Ranking> receivedMessages)
+       static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
        {
                expectedMessages().forEach(
                                (user, rankings) ->
@@ -63,97 +71,139 @@ class TestData
                                                                .containsExactlyElementsOf(rankings));
        }
 
-       static KeyValue<String, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
+       }
+
+       private static User userOf(TestUser user)
+       {
+               return User.of(user.getUser());
+       }
+
+       static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       {
+               assertThat(countMessagesForUser(PETER, receivedMessages));
+               assertThat(countMessagesForUser(KLAUS, receivedMessages));
+       }
+
+       private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+       {
+               return messagesForUsers.get(user) == null
+                               ? 0
+                               : messagesForUsers.get(user).size();
+       }
+
+
+       static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+       }
+
+       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
+       {
+               TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
+               assertRankingEqualsRankingFromLastMessage(user, testRanking);
+       }
+
+       private static TestEntry[] testEntriesOf(Entry... entries)
+       {
+               return Arrays
+                               .stream(entries)
+                               .map(entry -> TestEntry.of(
+                                               entry.getWord(),
+                                               entry.getCounter() == null
+                                                               ? -1l
+                                                               : entry.getCounter()))
+                               .toArray(size -> new TestEntry[size]);
+       }
+
+       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
+       {
+               assertThat(ranking).isEqualTo(getLastMessageFor(user));
+       }
+
+       private static TestRanking getLastMessageFor(TestUser user)
+       {
+               return getLastMessageFor(user, expectedMessages());
+       }
+
+       private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+       {
+               return messagesForUsers
+                               .get(user)
+                               .stream()
+                               .reduce(null, (left, right) -> right);
+       }
+
+       private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
-                                       "peter",
-                                       Ranking.of(
-                                                       Entry.of("Hallo", 1l))),
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l))),
                        KeyValue.pair( // 1
-                                       "klaus",
-                                       Ranking.of(
-                                                       Entry.of("Müsch", 1l))),
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 1l))),
                        KeyValue.pair( // 2
-                                       "peter",
-                                       Ranking.of(
-                                                       Entry.of("Hallo", 1l),
-                                                       Entry.of("Welt", 1l))),
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l))),
                        KeyValue.pair( // 3
-                                       "klaus",
-                                       Ranking.of(
-                                                       Entry.of("Müsch", 2l))),
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l))),
                        KeyValue.pair( // 4
-                                       "klaus",
-                                       Ranking.of(
-                                                       Entry.of("Müsch", 2l),
-                                                       Entry.of("s", 1l))),
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 1l))),
                        KeyValue.pair( // 5
-                                       "peter",
-                                       Ranking.of(
-                                                       Entry.of("Hallo", 1l),
-                                                       Entry.of("Welt", 1l),
-                                                       Entry.of("Boäh", 1l))),
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
                        KeyValue.pair( // 6
-                                       "peter",
-                                       Ranking.of(
-                                                       Entry.of("Welt", 2l),
-                                                       Entry.of("Hallo", 1l),
-                                                       Entry.of("Boäh", 1l))),
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
                        KeyValue.pair( // 7
-                                       "peter",
-                                       Ranking.of(
-                                                       Entry.of("Welt", 2l),
-                                                       Entry.of("Boäh", 2l),
-                                                       Entry.of("Hallo", 1l))),
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Boäh", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
                        KeyValue.pair( // 8
-                                       "klaus",
-                                       Ranking.of(
-                                                       Entry.of("Müsch", 2l),
-                                                       Entry.of("s", 2l))),
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 2l))),
                        KeyValue.pair( // 9
-                                       "peter",
-                                       Ranking.of(
-                                                       Entry.of("Boäh", 3l),
-                                                       Entry.of("Welt", 2l),
-                                                       Entry.of("Hallo", 1l))),
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Boäh", 3l),
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
                        KeyValue.pair( // 10
-                                       "klaus",
-                                       Ranking.of(
-                                                       Entry.of("s", 3l),
-                                                       Entry.of("Müsch", 2l))),
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("s", 3l),
+                                                       TestEntry.of("Müsch", 2l))),
        };
 
-       static MultiValueMap<String, Ranking> expectedMessages()
+       private static MultiValueMap<TestUser, TestRanking> expectedMessages()
        {
-               MultiValueMap<String, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+               MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
                Stream
                                .of(EXPECTED_MESSAGES)
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
                return expectedMessages;
        }
-
-       static Map<String, Object> convertToMap(Properties properties)
-       {
-               return properties
-                               .entrySet()
-                               .stream()
-                               .collect(
-                                               Collectors.toMap(
-                                                               entry -> (String)entry.getKey(),
-                                                               entry -> entry.getValue()
-                                               ));
-       }
-
-       static String parseHeader(Headers headers, String key)
-       {
-               Header header = headers.lastHeader(key);
-               if (header == null)
-               {
-                       return key + "=null";
-               }
-               else
-               {
-                       return key + "=" + new String(header.value());
-               }
-       }
 }