query: 2.1.2 - Refined tests (added messages, that are filtered)
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / TestData.java
1 package de.juplo.kafka.wordcount.query;
2
3 import de.juplo.kafka.wordcount.top10.TestEntry;
4 import de.juplo.kafka.wordcount.top10.TestRanking;
5 import de.juplo.kafka.wordcount.top10.TestUser;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
8
9 import java.util.Arrays;
10 import java.util.function.Function;
11 import java.util.stream.Stream;
12
13 import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 class TestData
18 {
19         static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
20         static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
21         static final TestUser OTHER_CHANNEL = TestUser.of("POPULAR", "klaus");
22
23         static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
24         {
25                 return Stream.of(TOP10_MESSAGES);
26         }
27
28         static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
29         {
30                 return Stream.of(USERS_MESSAGES);
31         }
32
33         static void assertExpectedState(Function<String, UserRanking> function)
34         {
35                 assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
36                 assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
37         }
38
39         private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
40         {
41                 assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
42         }
43
44         private static UserRanking getLastMessageFor(String user)
45         {
46                 return getTop10Messages()
47                                 .filter(kv -> kv.key.getChannel().equals(user))
48                                 .map(kv -> kv.value)
49                                 .map(testRanking -> userRankingFor(user, testRanking))
50                                 .reduce(null, (left, right) -> right);
51         }
52
53         private static UserRanking userRankingFor(String user, TestRanking testRanking)
54         {
55                 TestUserData testUserData = getUsersMessages()
56                                 .filter(kv -> kv.key.equals(user))
57                                 .map(kv -> kv.value)
58                                 .reduce(null, (left, right) -> right);
59
60                 Entry[] entries = Arrays
61                                 .stream(testRanking.getEntries())
62                                 .map(testEntry -> entryOf(testEntry))
63                                 .toArray(size -> new Entry[size]);
64
65                 return UserRanking.of(
66                                 testUserData.getFirstName(),
67                                 testUserData.getLastName(),
68                                 entries);
69         }
70
71         private static Entry entryOf(TestEntry testEntry)
72         {
73                 Entry entry = new Entry();
74                 entry.setKey(testEntry.getKey());
75                 entry.setCounter(testEntry.getCounter());
76                 return entry;
77         }
78         private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
79         {
80                         KeyValue.pair( // 0
81                                         PETER,
82                                         TestRanking.of(
83                                                         TestEntry.of("Hallo", 1l))),
84                         KeyValue.pair( // 1
85                                         KLAUS,
86                                         TestRanking.of(
87                                                         TestEntry.of("Müsch", 1l))),
88                         KeyValue.pair( // 1
89                                         OTHER_CHANNEL,
90                                         TestRanking.of(
91                                                         TestEntry.of("Müsch", 1l))),
92                         KeyValue.pair( // 2
93                                         PETER,
94                                         TestRanking.of(
95                                                         TestEntry.of("Hallo", 1l),
96                                                         TestEntry.of("Welt", 1l))),
97                         KeyValue.pair( // 3
98                                         KLAUS,
99                                         TestRanking.of(
100                                                         TestEntry.of("Müsch", 2l))),
101                         KeyValue.pair( // 4
102                                         KLAUS,
103                                         TestRanking.of(
104                                                         TestEntry.of("Müsch", 2l),
105                                                         TestEntry.of("s", 1l))),
106                         KeyValue.pair( // 5
107                                         PETER,
108                                         TestRanking.of(
109                                                         TestEntry.of("Hallo", 1l),
110                                                         TestEntry.of("Welt", 1l),
111                                                         TestEntry.of("Boäh", 1l))),
112                         KeyValue.pair( // 6
113                                         PETER,
114                                         TestRanking.of(
115                                                         TestEntry.of("Welt", 2l),
116                                                         TestEntry.of("Hallo", 1l),
117                                                         TestEntry.of("Boäh", 1l))),
118                         KeyValue.pair( // 7
119                                         PETER,
120                                         TestRanking.of(
121                                                         TestEntry.of("Welt", 2l),
122                                                         TestEntry.of("Boäh", 2l),
123                                                         TestEntry.of("Hallo", 1l))),
124                         KeyValue.pair( // 8
125                                         KLAUS,
126                                         TestRanking.of(
127                                                         TestEntry.of("Müsch", 2l),
128                                                         TestEntry.of("s", 2l))),
129                         KeyValue.pair( // 8
130                                         OTHER_CHANNEL,
131                                         TestRanking.of(
132                                                         TestEntry.of("Müsch", 2l),
133                                                         TestEntry.of("s", 2l))),
134                         KeyValue.pair( // 9
135                                         PETER,
136                                         TestRanking.of(
137                                                         TestEntry.of("Boäh", 3l),
138                                                         TestEntry.of("Welt", 2l),
139                                                         TestEntry.of("Hallo", 1l))),
140                         KeyValue.pair( // 10
141                                         KLAUS,
142                                         TestRanking.of(
143                                                         TestEntry.of("s", 3l),
144                                                         TestEntry.of("Müsch", 2l))),
145         };
146
147         private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
148         {
149                         KeyValue.pair(
150                                         PETER.getChannel(),
151                                         TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
152                         KeyValue.pair(
153                                         KLAUS.getChannel(),
154                                         TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
155         };
156 }