top10: 1.2.1 - Removed logging of type-headers in tests
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
index d8c603a..e0c53df 100644 (file)
@@ -2,16 +2,11 @@ package de.juplo.kafka.wordcount.top10;
 
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.counter.TestWord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -69,8 +64,44 @@ class TestData
 
        static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
        {
-               assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
-               assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+               assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+       }
+
+       static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
+       {
+               assertThat(countMessagesForUser(PETER, receivedMessages));
+               assertThat(countMessagesForUser(KLAUS, receivedMessages));
+       }
+
+       static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
+       {
+               return messagesForUsers.get(user).size();
+       }
+
+
+       static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+       }
+
+       static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
+       {
+               assertThat(ranking).isEqualTo(getLastMessageFor(user));
+       }
+
+       static Ranking getLastMessageFor(User user)
+       {
+               return getLastMessageFor(user, expectedMessages());
+       }
+
+       static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
+       {
+               return messagesForUsers
+                               .get(user)
+                               .stream()
+                               .reduce(null, (left, right) -> right);
        }
 
        static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
@@ -141,29 +172,4 @@ class TestData
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
                return expectedMessages;
        }
-
-       static Map<String, Object> convertToMap(Properties properties)
-       {
-               return properties
-                               .entrySet()
-                               .stream()
-                               .collect(
-                                               Collectors.toMap(
-                                                               entry -> (String)entry.getKey(),
-                                                               entry -> entry.getValue()
-                                               ));
-       }
-
-       static String parseHeader(Headers headers, String key)
-       {
-               Header header = headers.lastHeader(key);
-               if (header == null)
-               {
-                       return key + "=null";
-               }
-               else
-               {
-                       return key + "=" + new String(header.value());
-               }
-       }
 }