top10: 1.2.1 - Added expectations for the number of received messages
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
index 3bb6b54..a4f79ac 100644 (file)
@@ -5,6 +5,7 @@ import de.juplo.kafka.wordcount.counter.TestWord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
@@ -18,41 +19,44 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
+       static final User PETER = User.of("peter");
+       static final User KLAUS = User.of("klaus");
+
        static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       TestWord.of("peter","Hallo"),
-                                       TestCounter.of("peter","Hallo",1)),
+                                       TestWord.of(PETER.getUser(),"Hallo"),
+                                       TestCounter.of(PETER.getUser(),"Hallo",1)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","Müsch"),
-                                       TestCounter.of("klaus","Müsch",1)),
+                                       TestWord.of(KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of(KLAUS.getUser(),"Müsch",1)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Welt"),
-                                       TestCounter.of("peter","Welt",1)),
+                                       TestWord.of(PETER.getUser(),"Welt"),
+                                       TestCounter.of(PETER.getUser(),"Welt",1)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","Müsch"),
-                                       TestCounter.of("klaus","Müsch",2)),
+                                       TestWord.of(KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of(KLAUS.getUser(),"Müsch",2)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","s"),
-                                       TestCounter.of("klaus","s",1)),
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",1)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Boäh"),
-                                       TestCounter.of("peter","Boäh",1)),
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",1)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Welt"),
-                                       TestCounter.of("peter","Welt",2)),
+                                       TestWord.of(PETER.getUser(),"Welt"),
+                                       TestCounter.of(PETER.getUser(),"Welt",2)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Boäh"),
-                                       TestCounter.of("peter","Boäh",2)),
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",2)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","s"),
-                                       TestCounter.of("klaus","s",2)),
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",2)),
                        new KeyValue<>(
-                                       TestWord.of("peter","Boäh"),
-                                       TestCounter.of("peter","Boäh",3)),
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",3)),
                        new KeyValue<>(
-                                       TestWord.of("klaus","s"),
-                                       TestCounter.of("klaus","s",3)),
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",3)),
        };
 
        static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
@@ -63,61 +67,103 @@ class TestData
                                                                .containsExactlyElementsOf(rankings));
        }
 
+       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+       }
+
+       static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
+       {
+               assertThat(countMessagesForUser(PETER, receivedMessages));
+               assertThat(countMessagesForUser(KLAUS, receivedMessages));
+       }
+
+       static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
+       {
+               return messagesForUsers.get(user).size();
+       }
+
+
+       static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+       }
+
+       static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
+       {
+               assertThat(ranking).isEqualTo(getLastMessageFor(user));
+       }
+
+       static Ranking getLastMessageFor(User user)
+       {
+               return getLastMessageFor(user, expectedMessages());
+       }
+
+       static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
+       {
+               return messagesForUsers
+                               .get(user)
+                               .stream()
+                               .reduce(null, (left, right) -> right);
+       }
+
        static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
-                                       User.of("peter"),
+                                       PETER,
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 1
-                                       User.of("klaus"),
+                                       KLAUS,
                                        Ranking.of(
                                                        Entry.of("Müsch", 1l))),
                        KeyValue.pair( // 2
-                                       User.of("peter"),
+                                       PETER,
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Welt", 1l))),
                        KeyValue.pair( // 3
-                                       User.of("klaus"),
+                                       KLAUS,
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l))),
                        KeyValue.pair( // 4
-                                       User.of("klaus"),
+                                       KLAUS,
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l),
                                                        Entry.of("s", 1l))),
                        KeyValue.pair( // 5
-                                       User.of("peter"),
+                                       PETER,
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Welt", 1l),
                                                        Entry.of("Boäh", 1l))),
                        KeyValue.pair( // 6
-                                       User.of("peter"),
+                                       PETER,
                                        Ranking.of(
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Boäh", 1l))),
                        KeyValue.pair( // 7
-                                       User.of("peter"),
+                                       PETER,
                                        Ranking.of(
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Boäh", 2l),
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 8
-                                       User.of("klaus"),
+                                       KLAUS,
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l),
                                                        Entry.of("s", 2l))),
                        KeyValue.pair( // 9
-                                       User.of("peter"),
+                                       PETER,
                                        Ranking.of(
                                                        Entry.of("Boäh", 3l),
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 10
-                                       User.of("klaus"),
+                                       KLAUS,
                                        Ranking.of(
                                                        Entry.of("s", 3l),
                                                        Entry.of("Müsch", 2l))),