projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
2b22a00
)
query: 1.0.6 - Refactored `TestData.assertExpectedState()`
author
Kai Moritz
<kai@juplo.de>
Sun, 9 Jun 2024 17:35:22 +0000
(19:35 +0200)
committer
Kai Moritz
<kai@juplo.de>
Wed, 12 Jun 2024 20:24:58 +0000
(22:24 +0200)
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/query/TestData.java
patch
|
blob
|
history
diff --git
a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
index
4e44cda
..
914aeaf
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
@@
-82,7
+82,7
@@
public class QueryApplicationIT
await("Expected state")
.atMost(Duration.ofSeconds(5))
.catchUncaughtExceptions()
await("Expected state")
.atMost(Duration.ofSeconds(5))
.catchUncaughtExceptions()
- .untilAsserted(() -> TestData.assertExpectedState(
streamProcessor.getStore(
)));
+ .untilAsserted(() -> TestData.assertExpectedState(
user -> streamProcessor.getStore().get(user
)));
}
@TestConfiguration
}
@TestConfiguration
diff --git
a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
index
8439be1
..
6bdd8fa
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
@@
-65,7
+65,7
@@
public class QueryStreamProcessorTopologyTest
.forEach(kv -> top10In.pipeInput(kv.key, kv.value));
KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
.forEach(kv -> top10In.pipeInput(kv.key, kv.value));
KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
- TestData.assertExpectedState(
store
);
+ TestData.assertExpectedState(
user -> store.get(user)
);
}
@AfterEach
}
@AfterEach
diff --git
a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java
b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java
index
610bca0
..
3fcd7c9
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/query/TestData.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/query/TestData.java
@@
-5,9
+5,9
@@
import de.juplo.kafka.wordcount.top10.TestEntry;
import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Arrays;
import java.util.Arrays;
+import java.util.function.Function;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@@
-29,10
+29,10
@@
class TestData
return Stream.of(USERS_MESSAGES);
}
return Stream.of(USERS_MESSAGES);
}
- static void assertExpectedState(
ReadOnlyKeyValueStore<String, String> store
)
+ static void assertExpectedState(
Function<String, String> function
)
{
{
- assertRankingEqualsRankingFromLastMessage(PETER,
store.get
(PETER));
- assertRankingEqualsRankingFromLastMessage(KLAUS,
store.get
(KLAUS));
+ assertRankingEqualsRankingFromLastMessage(PETER,
function.apply
(PETER));
+ assertRankingEqualsRankingFromLastMessage(KLAUS,
function.apply
(KLAUS));
}
private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
}
private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)