1 package de.juplo.kafka.wordcount.stats;
3 import de.juplo.kafka.wordcount.top10.TestEntry;
4 import de.juplo.kafka.wordcount.top10.TestRanking;
5 import de.juplo.kafka.wordcount.top10.TestUser;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
9 import java.util.Arrays;
10 import java.util.function.Function;
11 import java.util.stream.Stream;
13 import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STATS_TYPE;
14 import static org.assertj.core.api.Assertions.assertThat;
19 static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
20 static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
22 static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
24 return Stream.of(TOP10_MESSAGES);
27 static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
29 return Stream.of(USERS_MESSAGES);
32 static void assertExpectedState(Function<String, UserRanking> function)
34 assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
35 assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
38 private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
40 assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
43 private static UserRanking getLastMessageFor(String user)
45 return getTop10Messages()
46 .filter(kv -> kv.key.getChannel().equals(user))
48 .map(testRanking -> userRankingFor(user, testRanking))
49 .reduce(null, (left, right) -> right);
52 private static UserRanking userRankingFor(String user, TestRanking testRanking)
54 TestUserData testUserData = getUsersMessages()
55 .filter(kv -> kv.key.equals(user))
57 .reduce(null, (left, right) -> right);
59 Entry[] entries = Arrays
60 .stream(testRanking.getEntries())
61 .map(testEntry -> entryOf(testEntry))
62 .toArray(size -> new Entry[size]);
64 return UserRanking.of(
65 testUserData.getFirstName(),
66 testUserData.getLastName(),
70 private static Entry entryOf(TestEntry testEntry)
72 Entry entry = new Entry();
73 entry.setKey(testEntry.getKey());
74 entry.setCounter(testEntry.getCounter());
77 private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
82 TestEntry.of("Hallo", 1l))),
86 TestEntry.of("Müsch", 1l))),
90 TestEntry.of("Hallo", 1l),
91 TestEntry.of("Welt", 1l))),
95 TestEntry.of("Müsch", 2l))),
99 TestEntry.of("Müsch", 2l),
100 TestEntry.of("s", 1l))),
104 TestEntry.of("Hallo", 1l),
105 TestEntry.of("Welt", 1l),
106 TestEntry.of("Boäh", 1l))),
110 TestEntry.of("Welt", 2l),
111 TestEntry.of("Hallo", 1l),
112 TestEntry.of("Boäh", 1l))),
116 TestEntry.of("Welt", 2l),
117 TestEntry.of("Boäh", 2l),
118 TestEntry.of("Hallo", 1l))),
122 TestEntry.of("Müsch", 2l),
123 TestEntry.of("s", 2l))),
127 TestEntry.of("Boäh", 3l),
128 TestEntry.of("Welt", 2l),
129 TestEntry.of("Hallo", 1l))),
133 TestEntry.of("s", 3l),
134 TestEntry.of("Müsch", 2l))),
137 private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
141 TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
144 TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),