1 package de.juplo.kafka.wordcount.query;
3 import de.juplo.kafka.wordcount.top10.TestEntry;
4 import de.juplo.kafka.wordcount.top10.TestRanking;
5 import de.juplo.kafka.wordcount.top10.TestUser;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
9 import java.util.Arrays;
10 import java.util.function.Function;
11 import java.util.stream.Stream;
13 import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
14 import static org.assertj.core.api.Assertions.assertThat;
19 static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
20 static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
21 static final TestUser OTHER_CHANNEL = TestUser.of("POPULAR", "klaus");
23 static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
25 return Stream.of(TOP10_MESSAGES);
28 static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
30 return Stream.of(USERS_MESSAGES);
33 static void assertExpectedState(Function<String, UserRanking> function)
35 assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
36 assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
39 private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
41 assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
44 private static UserRanking getLastMessageFor(String user)
46 return getTop10Messages()
47 .filter(kv -> kv.key.getChannel().equals(user))
49 .map(testRanking -> userRankingFor(user, testRanking))
50 .reduce(null, (left, right) -> right);
53 private static UserRanking userRankingFor(String user, TestRanking testRanking)
55 TestUserData testUserData = getUsersMessages()
56 .filter(kv -> kv.key.equals(user))
58 .reduce(null, (left, right) -> right);
60 Entry[] entries = Arrays
61 .stream(testRanking.getEntries())
62 .map(testEntry -> entryOf(testEntry))
63 .toArray(size -> new Entry[size]);
65 return UserRanking.of(
66 testUserData.getFirstName(),
67 testUserData.getLastName(),
71 private static Entry entryOf(TestEntry testEntry)
73 Entry entry = new Entry();
74 entry.setKey(testEntry.getKey());
75 entry.setCounter(testEntry.getCounter());
78 private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
83 TestEntry.of("Hallo", 1l))),
87 TestEntry.of("Müsch", 1l))),
91 TestEntry.of("Müsch", 1l))),
95 TestEntry.of("Hallo", 1l),
96 TestEntry.of("Welt", 1l))),
100 TestEntry.of("Müsch", 2l))),
104 TestEntry.of("Müsch", 2l),
105 TestEntry.of("s", 1l))),
109 TestEntry.of("Hallo", 1l),
110 TestEntry.of("Welt", 1l),
111 TestEntry.of("Boäh", 1l))),
115 TestEntry.of("Welt", 2l),
116 TestEntry.of("Hallo", 1l),
117 TestEntry.of("Boäh", 1l))),
121 TestEntry.of("Welt", 2l),
122 TestEntry.of("Boäh", 2l),
123 TestEntry.of("Hallo", 1l))),
127 TestEntry.of("Müsch", 2l),
128 TestEntry.of("s", 2l))),
132 TestEntry.of("Müsch", 2l),
133 TestEntry.of("s", 2l))),
137 TestEntry.of("Boäh", 3l),
138 TestEntry.of("Welt", 2l),
139 TestEntry.of("Hallo", 1l))),
143 TestEntry.of("s", 3l),
144 TestEntry.of("Müsch", 2l))),
147 private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
151 TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
154 TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),