1 package de.juplo.kafka.wordcount.stats;
3 import de.juplo.kafka.wordcount.top10.TestEntry;
4 import de.juplo.kafka.wordcount.top10.TestRanking;
5 import de.juplo.kafka.wordcount.top10.TestUser;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
9 import java.util.Arrays;
10 import java.util.function.Function;
11 import java.util.stream.Stream;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static final TestUser PETER = TestUser.of(StatisticsType.POPULAR.name(), "peter");
19 static final TestUser KLAUS = TestUser.of(StatisticsType.POPULAR.name(), "klaus");
20 static final TestUser OTHER_CHANNEL = TestUser.of("COUNTER", "klaus");
22 static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
24 return Stream.of(TOP10_MESSAGES);
27 static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
29 return Stream.of(USERS_MESSAGES);
32 static void assertExpectedState(Function<String, UserRanking> function)
34 assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
35 assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
38 private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
40 assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
43 private static UserRanking getLastMessageFor(String user)
45 return getTop10Messages()
46 .filter(kv -> kv.key.getChannel().equals(user))
48 .map(testRanking -> userRankingFor(user, testRanking))
49 .reduce(null, (left, right) -> right);
52 private static UserRanking userRankingFor(String user, TestRanking testRanking)
54 TestUserData testUserData = getUsersMessages()
55 .filter(kv -> kv.key.equals(user))
57 .reduce(null, (left, right) -> right);
59 Entry[] entries = Arrays
60 .stream(testRanking.getEntries())
61 .map(testEntry -> entryOf(testEntry))
62 .toArray(size -> new Entry[size]);
64 return UserRanking.of(
65 testUserData.getFirstName(),
66 testUserData.getLastName(),
70 private static Entry entryOf(TestEntry testEntry)
72 Entry entry = new Entry();
73 entry.setKey(testEntry.getKey());
74 entry.setCounter(testEntry.getCounter());
77 private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
82 TestEntry.of("Hallo", 1l))),
83 KeyValue.pair( // BOOM!
86 TestEntry.of("Müsch", 1l))),
90 TestEntry.of("Müsch", 1l))),
94 TestEntry.of("Hallo", 1l),
95 TestEntry.of("Welt", 1l))),
99 TestEntry.of("Müsch", 2l))),
103 TestEntry.of("Müsch", 2l),
104 TestEntry.of("s", 1l))),
108 TestEntry.of("Hallo", 1l),
109 TestEntry.of("Welt", 1l),
110 TestEntry.of("Boäh", 1l))),
114 TestEntry.of("Welt", 2l),
115 TestEntry.of("Hallo", 1l),
116 TestEntry.of("Boäh", 1l))),
120 TestEntry.of("Welt", 2l),
121 TestEntry.of("Boäh", 2l),
122 TestEntry.of("Hallo", 1l))),
126 TestEntry.of("Müsch", 2l),
127 TestEntry.of("s", 2l))),
128 KeyValue.pair( // BOOM!
131 TestEntry.of("Müsch", 2l),
132 TestEntry.of("s", 2l))),
136 TestEntry.of("Boäh", 3l),
137 TestEntry.of("Welt", 2l),
138 TestEntry.of("Hallo", 1l))),
142 TestEntry.of("s", 3l),
143 TestEntry.of("Müsch", 2l))),
146 private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
150 TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
153 TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),