package de.juplo.kafka.wordcount.query;
-import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestEntry;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Arrays;
+import java.util.function.Function;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- static final ObjectMapper objectMapper = new ObjectMapper();
- static final String PETER = "peter";
- static final String KLAUS = "klaus";
+ static final TestUser PETER = TestUser.of("peter");
+ static final TestUser KLAUS = TestUser.of("klaus");
- static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+ static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
{
return Stream.of(TOP10_MESSAGES);
}
return Stream.of(USERS_MESSAGES);
}
- static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+ static void assertExpectedState(Function<String, UserRanking> function)
{
- assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
- assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+ assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername()));
+ assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername()));
}
- private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
+ private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
{
- assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
- }
-
- private static UserRanking userRankingOf(String json)
- {
- try
- {
- return objectMapper.readValue(json, UserRanking.class);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
}
private static UserRanking getLastMessageFor(String user)
{
return getTop10Messages()
- .filter(kv -> kv.key.equals(user))
+ .filter(kv -> kv.key.getUsername().equals(user))
.map(kv -> kv.value)
.map(testRanking -> userRankingFor(user, testRanking))
.reduce(null, (left, right) -> right);
entry.setCount(testEntry.getCount());
return entry;
}
-
- private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+ private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
PETER,
private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- PETER,
- TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+ PETER.getUsername(),
+ TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)),
KeyValue.pair(
- KLAUS,
- TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+ KLAUS.getUsername(),
+ TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
};
}