]> juplo.de Git - demos/kafka/wordcount/commitdiff
query: 1.0.6 - Refactored `TestData.assertExpectedState()`
authorKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 17:35:22 +0000 (19:35 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 12 Jun 2024 20:24:58 +0000 (22:24 +0200)
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/query/TestData.java

index 4e44cdaa2f4d7524ad6dd1eea57a157dc9bde1e3..914aeaf35f0bdf4e419a7b5e9877c61b76fa1a83 100644 (file)
@@ -82,7 +82,7 @@ public class QueryApplicationIT
                await("Expected state")
                                .atMost(Duration.ofSeconds(5))
                                .catchUncaughtExceptions()
-                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+                               .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
        }
 
        @TestConfiguration
index 8439be17ae94878505f9cae2f38d18a8cd299195..6bdd8fa402460759f0dc9e45061a26b7c7244f20 100644 (file)
@@ -65,7 +65,7 @@ public class QueryStreamProcessorTopologyTest
         .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
 
     KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
-    TestData.assertExpectedState(store);
+    TestData.assertExpectedState(user -> store.get(user));
   }
 
   @AfterEach
index 610bca04c05892a37641e79eb15ad1b6ca447d7f..3fcd7c9638baca6c839238762a2a74b112ae9513 100644 (file)
@@ -5,9 +5,9 @@ import de.juplo.kafka.wordcount.top10.TestEntry;
 import de.juplo.kafka.wordcount.top10.TestRanking;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.util.Arrays;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -29,10 +29,10 @@ class TestData
                return Stream.of(USERS_MESSAGES);
        }
 
-       static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+       static void assertExpectedState(Function<String, String> function)
        {
-               assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
-               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+               assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
        }
 
        private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)