+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestEntry;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+ static final ObjectMapper objectMapper = new ObjectMapper();
+ static final String PETER = "peter";
+ static final String KLAUS = "klaus";
+
+ static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+ {
+ return Stream.of(TOP10_MESSAGES);
+ }
+
+ static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
+ {
+ return Stream.of(USERS_MESSAGES);
+ }
+
+ static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
+ {
+ assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
+ }
+
+ private static UserRanking userRankingOf(String json)
+ {
+ try
+ {
+ return objectMapper.readValue(json, UserRanking.class);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static UserRanking getLastMessageFor(String user)
+ {
+ return getTop10Messages()
+ .filter(kv -> kv.key.equals(user))
+ .map(kv -> kv.value)
+ .map(testRanking -> userRankingFor(user, testRanking))
+ .reduce(null, (left, right) -> right);
+ }
+
+ private static UserRanking userRankingFor(String user, TestRanking testRanking)
+ {
+ TestUserData testUserData = getUsersMessages()
+ .filter(kv -> kv.key.equals(user))
+ .map(kv -> kv.value)
+ .reduce(null, (left, right) -> right);
+
+ Entry[] entries = Arrays
+ .stream(testRanking.getEntries())
+ .map(testEntry -> entryOf(testEntry))
+ .toArray(size -> new Entry[size]);
+
+ return UserRanking.of(
+ testUserData.getFirstName(),
+ testUserData.getLastName(),
+ entries);
+ }
+
+ private static Entry entryOf(TestEntry testEntry)
+ {
+ Entry entry = new Entry();
+ entry.setWord(testEntry.getWord());
+ entry.setCount(testEntry.getCount());
+ return entry;
+ }
+
+ private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair( // 0
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 1
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 1l))),
+ KeyValue.pair( // 2
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l))),
+ KeyValue.pair( // 3
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l))),
+ KeyValue.pair( // 4
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 1l))),
+ KeyValue.pair( // 5
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 6
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 7
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Boäh", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 8
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 2l))),
+ KeyValue.pair( // 9
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Boäh", 3l),
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 10
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("s", 3l),
+ TestEntry.of("Müsch", 2l))),
+ };
+
+ private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair(
+ PETER,
+ TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+ KeyValue.pair(
+ KLAUS,
+ TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+ };
+}