query: 1.0.6 - Added `QueryStreamProcessorTopologyTest`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / TestData.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import de.juplo.kafka.wordcount.top10.TestEntry;
5 import de.juplo.kafka.wordcount.top10.TestRanking;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9
10 import java.util.Arrays;
11 import java.util.stream.Stream;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static final ObjectMapper objectMapper = new ObjectMapper();
19         static final String PETER = "peter";
20         static final String KLAUS = "klaus";
21
22         static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
23         {
24                 return Stream.of(TOP10_MESSAGES);
25         }
26
27         static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
28         {
29                 return Stream.of(USERS_MESSAGES);
30         }
31
32         static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
33         {
34                 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
35                 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
36         }
37
38         private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
39         {
40                 assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
41         }
42
43         private static UserRanking userRankingOf(String json)
44         {
45                 try
46                 {
47                         return objectMapper.readValue(json, UserRanking.class);
48                 }
49                 catch (Exception e)
50                 {
51                         throw new RuntimeException(e);
52                 }
53         }
54
55         private static UserRanking getLastMessageFor(String user)
56         {
57                 return getTop10Messages()
58                                 .filter(kv -> kv.key.equals(user))
59                                 .map(kv -> kv.value)
60                                 .map(testRanking -> userRankingFor(user, testRanking))
61                                 .reduce(null, (left, right) -> right);
62         }
63
64         private static UserRanking userRankingFor(String user, TestRanking testRanking)
65         {
66                 TestUserData testUserData = getUsersMessages()
67                                 .filter(kv -> kv.key.equals(user))
68                                 .map(kv -> kv.value)
69                                 .reduce(null, (left, right) -> right);
70
71                 Entry[] entries = Arrays
72                                 .stream(testRanking.getEntries())
73                                 .map(testEntry -> entryOf(testEntry))
74                                 .toArray(size -> new Entry[size]);
75
76                 return UserRanking.of(
77                                 testUserData.getFirstName(),
78                                 testUserData.getLastName(),
79                                 entries);
80         }
81
82         private static Entry entryOf(TestEntry testEntry)
83         {
84                 Entry entry = new Entry();
85                 entry.setWord(testEntry.getWord());
86                 entry.setCount(testEntry.getCount());
87                 return entry;
88         }
89
90         private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
91         {
92                         KeyValue.pair( // 0
93                                         PETER,
94                                         TestRanking.of(
95                                                         TestEntry.of("Hallo", 1l))),
96                         KeyValue.pair( // 1
97                                         KLAUS,
98                                         TestRanking.of(
99                                                         TestEntry.of("Müsch", 1l))),
100                         KeyValue.pair( // 2
101                                         PETER,
102                                         TestRanking.of(
103                                                         TestEntry.of("Hallo", 1l),
104                                                         TestEntry.of("Welt", 1l))),
105                         KeyValue.pair( // 3
106                                         KLAUS,
107                                         TestRanking.of(
108                                                         TestEntry.of("Müsch", 2l))),
109                         KeyValue.pair( // 4
110                                         KLAUS,
111                                         TestRanking.of(
112                                                         TestEntry.of("Müsch", 2l),
113                                                         TestEntry.of("s", 1l))),
114                         KeyValue.pair( // 5
115                                         PETER,
116                                         TestRanking.of(
117                                                         TestEntry.of("Hallo", 1l),
118                                                         TestEntry.of("Welt", 1l),
119                                                         TestEntry.of("Boäh", 1l))),
120                         KeyValue.pair( // 6
121                                         PETER,
122                                         TestRanking.of(
123                                                         TestEntry.of("Welt", 2l),
124                                                         TestEntry.of("Hallo", 1l),
125                                                         TestEntry.of("Boäh", 1l))),
126                         KeyValue.pair( // 7
127                                         PETER,
128                                         TestRanking.of(
129                                                         TestEntry.of("Welt", 2l),
130                                                         TestEntry.of("Boäh", 2l),
131                                                         TestEntry.of("Hallo", 1l))),
132                         KeyValue.pair( // 8
133                                         KLAUS,
134                                         TestRanking.of(
135                                                         TestEntry.of("Müsch", 2l),
136                                                         TestEntry.of("s", 2l))),
137                         KeyValue.pair( // 9
138                                         PETER,
139                                         TestRanking.of(
140                                                         TestEntry.of("Boäh", 3l),
141                                                         TestEntry.of("Welt", 2l),
142                                                         TestEntry.of("Hallo", 1l))),
143                         KeyValue.pair( // 10
144                                         KLAUS,
145                                         TestRanking.of(
146                                                         TestEntry.of("s", 3l),
147                                                         TestEntry.of("Müsch", 2l))),
148         };
149
150         private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
151         {
152                         KeyValue.pair(
153                                         PETER,
154                                         TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
155                         KeyValue.pair(
156                                         KLAUS,
157                                         TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
158         };
159 }