await("Expected state")
.atMost(Duration.ofSeconds(5))
.catchUncaughtExceptions()
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+ .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
}
@TestConfiguration
.forEach(kv -> top10In.pipeInput(kv.key, kv.value));
KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
- TestData.assertExpectedState(store);
+ TestData.assertExpectedState(user -> store.get(user));
}
@AfterEach
import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Arrays;
+import java.util.function.Function;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
return Stream.of(USERS_MESSAGES);
}
- static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+ static void assertExpectedState(Function<String, String> function)
{
- assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
- assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+ assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
}
private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)