From: Kai Moritz Date: Sun, 9 Jun 2024 17:35:22 +0000 (+0200) Subject: query: 1.0.6 - Refactored `TestData.assertExpectedState()` X-Git-Tag: query-with-kafkaproducer~10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=cfd58858b9861e8d0e4c1d30896505a50f63255b;p=demos%2Fkafka%2Fwordcount query: 1.0.6 - Refactored `TestData.assertExpectedState()` --- diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 4e44cda..914aeaf 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -82,7 +82,7 @@ public class QueryApplicationIT await("Expected state") .atMost(Duration.ofSeconds(5)) .catchUncaughtExceptions() - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); } @TestConfiguration diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 8439be1..6bdd8fa 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -65,7 +65,7 @@ public class QueryStreamProcessorTopologyTest .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); - TestData.assertExpectedState(store); + TestData.assertExpectedState(user -> store.get(user)); } @AfterEach diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 610bca0..3fcd7c9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -5,9 +5,9 @@ import de.juplo.kafka.wordcount.top10.TestEntry; import de.juplo.kafka.wordcount.top10.TestRanking; import de.juplo.kafka.wordcount.users.TestUserData; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Arrays; +import java.util.function.Function; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -29,10 +29,10 @@ class TestData return Stream.of(USERS_MESSAGES); } - static void assertExpectedState(ReadOnlyKeyValueStore store) + static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER)); - assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS)); + assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER)); + assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS)); } private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)