query: 1.0.6 - Added `QueryApplicationIT`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / TestData.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import de.juplo.kafka.wordcount.top10.TestEntry;
5 import de.juplo.kafka.wordcount.top10.TestRanking;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9
10 import java.util.Arrays;
11 import java.util.stream.Stream;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static final ObjectMapper objectMapper = new ObjectMapper();
19         static final String PETER = "peter";
20         static final String KLAUS = "klaus";
21
22         static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
23         {
24                 return Stream.of(TOP10_MESSAGES);
25         }
26
27         static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
28         {
29                 return Stream.of(USERS_MESSAGES);
30         }
31
32         static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
33         {
34                 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
35                 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
36         }
37
38         private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
39         {
40                 assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
41         }
42
43         private static UserRanking userRankingOf(String json)
44         {
45                 if (json == null)
46                 {
47                         return null;
48                 }
49
50                 try
51                 {
52                         return objectMapper.readValue(json, UserRanking.class);
53                 }
54                 catch (Exception e)
55                 {
56                         throw new RuntimeException(e);
57                 }
58         }
59
60         private static UserRanking getLastMessageFor(String user)
61         {
62                 return getTop10Messages()
63                                 .filter(kv -> kv.key.equals(user))
64                                 .map(kv -> kv.value)
65                                 .map(testRanking -> userRankingFor(user, testRanking))
66                                 .reduce(null, (left, right) -> right);
67         }
68
69         private static UserRanking userRankingFor(String user, TestRanking testRanking)
70         {
71                 TestUserData testUserData = getUsersMessages()
72                                 .filter(kv -> kv.key.equals(user))
73                                 .map(kv -> kv.value)
74                                 .reduce(null, (left, right) -> right);
75
76                 Entry[] entries = Arrays
77                                 .stream(testRanking.getEntries())
78                                 .map(testEntry -> entryOf(testEntry))
79                                 .toArray(size -> new Entry[size]);
80
81                 return UserRanking.of(
82                                 testUserData.getFirstName(),
83                                 testUserData.getLastName(),
84                                 entries);
85         }
86
87         private static Entry entryOf(TestEntry testEntry)
88         {
89                 Entry entry = new Entry();
90                 entry.setWord(testEntry.getWord());
91                 entry.setCount(testEntry.getCount());
92                 return entry;
93         }
94
95         private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
96         {
97                         KeyValue.pair( // 0
98                                         PETER,
99                                         TestRanking.of(
100                                                         TestEntry.of("Hallo", 1l))),
101                         KeyValue.pair( // 1
102                                         KLAUS,
103                                         TestRanking.of(
104                                                         TestEntry.of("Müsch", 1l))),
105                         KeyValue.pair( // 2
106                                         PETER,
107                                         TestRanking.of(
108                                                         TestEntry.of("Hallo", 1l),
109                                                         TestEntry.of("Welt", 1l))),
110                         KeyValue.pair( // 3
111                                         KLAUS,
112                                         TestRanking.of(
113                                                         TestEntry.of("Müsch", 2l))),
114                         KeyValue.pair( // 4
115                                         KLAUS,
116                                         TestRanking.of(
117                                                         TestEntry.of("Müsch", 2l),
118                                                         TestEntry.of("s", 1l))),
119                         KeyValue.pair( // 5
120                                         PETER,
121                                         TestRanking.of(
122                                                         TestEntry.of("Hallo", 1l),
123                                                         TestEntry.of("Welt", 1l),
124                                                         TestEntry.of("Boäh", 1l))),
125                         KeyValue.pair( // 6
126                                         PETER,
127                                         TestRanking.of(
128                                                         TestEntry.of("Welt", 2l),
129                                                         TestEntry.of("Hallo", 1l),
130                                                         TestEntry.of("Boäh", 1l))),
131                         KeyValue.pair( // 7
132                                         PETER,
133                                         TestRanking.of(
134                                                         TestEntry.of("Welt", 2l),
135                                                         TestEntry.of("Boäh", 2l),
136                                                         TestEntry.of("Hallo", 1l))),
137                         KeyValue.pair( // 8
138                                         KLAUS,
139                                         TestRanking.of(
140                                                         TestEntry.of("Müsch", 2l),
141                                                         TestEntry.of("s", 2l))),
142                         KeyValue.pair( // 9
143                                         PETER,
144                                         TestRanking.of(
145                                                         TestEntry.of("Boäh", 3l),
146                                                         TestEntry.of("Welt", 2l),
147                                                         TestEntry.of("Hallo", 1l))),
148                         KeyValue.pair( // 10
149                                         KLAUS,
150                                         TestRanking.of(
151                                                         TestEntry.of("s", 3l),
152                                                         TestEntry.of("Müsch", 2l))),
153         };
154
155         private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
156         {
157                         KeyValue.pair(
158                                         PETER,
159                                         TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
160                         KeyValue.pair(
161                                         KLAUS,
162                                         TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
163         };
164 }