import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Arrays;
+import java.util.function.Function;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
return Stream.of(USERS_MESSAGES);
}
- static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+ static void assertExpectedState(Function<String, String> function)
{
- assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
- assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+ assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
}
private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)