1 package de.juplo.kafka.wordcount.query;
3 import de.juplo.kafka.wordcount.top10.TestEntry;
4 import de.juplo.kafka.wordcount.top10.TestRanking;
5 import de.juplo.kafka.wordcount.users.TestUserData;
6 import org.apache.kafka.streams.KeyValue;
8 import java.util.Arrays;
9 import java.util.function.Function;
10 import java.util.stream.Stream;
12 import static org.assertj.core.api.Assertions.assertThat;
17 static final String PETER = "peter";
18 static final String KLAUS = "klaus";
20 static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
22 return Stream.of(TOP10_MESSAGES);
25 static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
27 return Stream.of(USERS_MESSAGES);
30 static void assertExpectedState(Function<String, UserRanking> function)
32 assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
33 assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
36 private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
38 assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
41 private static UserRanking getLastMessageFor(String user)
43 return getTop10Messages()
44 .filter(kv -> kv.key.equals(user))
46 .map(testRanking -> userRankingFor(user, testRanking))
47 .reduce(null, (left, right) -> right);
50 private static UserRanking userRankingFor(String user, TestRanking testRanking)
52 TestUserData testUserData = getUsersMessages()
53 .filter(kv -> kv.key.equals(user))
55 .reduce(null, (left, right) -> right);
57 Entry[] entries = Arrays
58 .stream(testRanking.getEntries())
59 .map(testEntry -> entryOf(testEntry))
60 .toArray(size -> new Entry[size]);
62 return UserRanking.of(
63 testUserData.getFirstName(),
64 testUserData.getLastName(),
68 private static Entry entryOf(TestEntry testEntry)
70 Entry entry = new Entry();
71 entry.setWord(testEntry.getWord());
72 entry.setCount(testEntry.getCount());
76 private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
81 TestEntry.of("Hallo", 1l))),
85 TestEntry.of("Müsch", 1l))),
89 TestEntry.of("Hallo", 1l),
90 TestEntry.of("Welt", 1l))),
94 TestEntry.of("Müsch", 2l))),
98 TestEntry.of("Müsch", 2l),
99 TestEntry.of("s", 1l))),
103 TestEntry.of("Hallo", 1l),
104 TestEntry.of("Welt", 1l),
105 TestEntry.of("Boäh", 1l))),
109 TestEntry.of("Welt", 2l),
110 TestEntry.of("Hallo", 1l),
111 TestEntry.of("Boäh", 1l))),
115 TestEntry.of("Welt", 2l),
116 TestEntry.of("Boäh", 2l),
117 TestEntry.of("Hallo", 1l))),
121 TestEntry.of("Müsch", 2l),
122 TestEntry.of("s", 2l))),
126 TestEntry.of("Boäh", 3l),
127 TestEntry.of("Welt", 2l),
128 TestEntry.of("Hallo", 1l))),
132 TestEntry.of("s", 3l),
133 TestEntry.of("Müsch", 2l))),
136 private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
140 TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
143 TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),