top10: 1.2.1 - (GREEN) Made the aggregation state accessible
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
index 73a405e..f6d7ccd 100644 (file)
@@ -5,6 +5,7 @@ import de.juplo.kafka.wordcount.counter.TestWord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
@@ -55,7 +56,7 @@ class TestData
                                        TestCounter.of("klaus","s",3)),
        };
 
-       static void assertExpectedMessages(MultiValueMap<String, Ranking> receivedMessages)
+       static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
        {
                expectedMessages().forEach(
                                (user, rankings) ->
@@ -63,69 +64,75 @@ class TestData
                                                                .containsExactlyElementsOf(rankings));
        }
 
-       static KeyValue<String, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       {
+               assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
+               assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+       }
+
+       static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair( // 0
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 1
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 1l))),
                        KeyValue.pair( // 2
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Welt", 1l))),
                        KeyValue.pair( // 3
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l))),
                        KeyValue.pair( // 4
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l),
                                                        Entry.of("s", 1l))),
                        KeyValue.pair( // 5
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Welt", 1l),
                                                        Entry.of("Boäh", 1l))),
                        KeyValue.pair( // 6
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Hallo", 1l),
                                                        Entry.of("Boäh", 1l))),
                        KeyValue.pair( // 7
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Boäh", 2l),
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 8
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("Müsch", 2l),
                                                        Entry.of("s", 2l))),
                        KeyValue.pair( // 9
-                                       "peter",
+                                       User.of("peter"),
                                        Ranking.of(
                                                        Entry.of("Boäh", 3l),
                                                        Entry.of("Welt", 2l),
                                                        Entry.of("Hallo", 1l))),
                        KeyValue.pair( // 10
-                                       "klaus",
+                                       User.of("klaus"),
                                        Ranking.of(
                                                        Entry.of("s", 3l),
                                                        Entry.of("Müsch", 2l))),
        };
 
-       static MultiValueMap<String, Ranking> expectedMessages()
+       static MultiValueMap<User, Ranking> expectedMessages()
        {
-               MultiValueMap<String, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+               MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
                Stream
                                .of(EXPECTED_MESSAGES)
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));