top10: 1.2.1 - Refined `Top10StreamProcessorTopologyTest`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.stream.Stream;
13
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 class TestData
18 {
19         static final User PETER = User.of("peter");
20         static final User KLAUS = User.of("klaus");
21
22         static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
23         {
24                         new KeyValue<>(
25                                         TestWord.of(PETER.getUser(),"Hallo"),
26                                         TestCounter.of(PETER.getUser(),"Hallo",1)),
27                         new KeyValue<>(
28                                         TestWord.of(KLAUS.getUser(),"Müsch"),
29                                         TestCounter.of(KLAUS.getUser(),"Müsch",1)),
30                         new KeyValue<>(
31                                         TestWord.of(PETER.getUser(),"Welt"),
32                                         TestCounter.of(PETER.getUser(),"Welt",1)),
33                         new KeyValue<>(
34                                         TestWord.of(KLAUS.getUser(),"Müsch"),
35                                         TestCounter.of(KLAUS.getUser(),"Müsch",2)),
36                         new KeyValue<>(
37                                         TestWord.of(KLAUS.getUser(),"s"),
38                                         TestCounter.of(KLAUS.getUser(),"s",1)),
39                         new KeyValue<>(
40                                         TestWord.of(PETER.getUser(),"Boäh"),
41                                         TestCounter.of(PETER.getUser(),"Boäh",1)),
42                         new KeyValue<>(
43                                         TestWord.of(PETER.getUser(),"Welt"),
44                                         TestCounter.of(PETER.getUser(),"Welt",2)),
45                         new KeyValue<>(
46                                         TestWord.of(PETER.getUser(),"Boäh"),
47                                         TestCounter.of(PETER.getUser(),"Boäh",2)),
48                         new KeyValue<>(
49                                         TestWord.of(KLAUS.getUser(),"s"),
50                                         TestCounter.of(KLAUS.getUser(),"s",2)),
51                         new KeyValue<>(
52                                         TestWord.of(PETER.getUser(),"Boäh"),
53                                         TestCounter.of(PETER.getUser(),"Boäh",3)),
54                         new KeyValue<>(
55                                         TestWord.of(KLAUS.getUser(),"s"),
56                                         TestCounter.of(KLAUS.getUser(),"s",3)),
57         };
58
59         static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
60         {
61                 expectedMessages().forEach(
62                                 (user, rankings) ->
63                                                 assertThat(receivedMessages.get(user))
64                                                                 .containsExactlyElementsOf(rankings));
65         }
66
67         static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
68         {
69                 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
70                 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
71         }
72
73         static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
74         {
75                 assertThat(countMessagesForUser(PETER, receivedMessages));
76                 assertThat(countMessagesForUser(KLAUS, receivedMessages));
77         }
78
79         static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
80         {
81                 return messagesForUsers.get(user).size();
82         }
83
84
85         static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
86         {
87                 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
88                 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
89         }
90
91         static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
92         {
93                 assertThat(ranking).isEqualTo(getLastMessageFor(user));
94         }
95
96         static Ranking getLastMessageFor(User user)
97         {
98                 return getLastMessageFor(user, expectedMessages());
99         }
100
101         static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
102         {
103                 return messagesForUsers
104                                 .get(user)
105                                 .stream()
106                                 .reduce(null, (left, right) -> right);
107         }
108
109         static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
110         {
111                         KeyValue.pair( // 0
112                                         PETER,
113                                         Ranking.of(
114                                                         Entry.of("Hallo", 1l))),
115                         KeyValue.pair( // 1
116                                         KLAUS,
117                                         Ranking.of(
118                                                         Entry.of("Müsch", 1l))),
119                         KeyValue.pair( // 2
120                                         PETER,
121                                         Ranking.of(
122                                                         Entry.of("Hallo", 1l),
123                                                         Entry.of("Welt", 1l))),
124                         KeyValue.pair( // 3
125                                         KLAUS,
126                                         Ranking.of(
127                                                         Entry.of("Müsch", 2l))),
128                         KeyValue.pair( // 4
129                                         KLAUS,
130                                         Ranking.of(
131                                                         Entry.of("Müsch", 2l),
132                                                         Entry.of("s", 1l))),
133                         KeyValue.pair( // 5
134                                         PETER,
135                                         Ranking.of(
136                                                         Entry.of("Hallo", 1l),
137                                                         Entry.of("Welt", 1l),
138                                                         Entry.of("Boäh", 1l))),
139                         KeyValue.pair( // 6
140                                         PETER,
141                                         Ranking.of(
142                                                         Entry.of("Welt", 2l),
143                                                         Entry.of("Hallo", 1l),
144                                                         Entry.of("Boäh", 1l))),
145                         KeyValue.pair( // 7
146                                         PETER,
147                                         Ranking.of(
148                                                         Entry.of("Welt", 2l),
149                                                         Entry.of("Boäh", 2l),
150                                                         Entry.of("Hallo", 1l))),
151                         KeyValue.pair( // 8
152                                         KLAUS,
153                                         Ranking.of(
154                                                         Entry.of("Müsch", 2l),
155                                                         Entry.of("s", 2l))),
156                         KeyValue.pair( // 9
157                                         PETER,
158                                         Ranking.of(
159                                                         Entry.of("Boäh", 3l),
160                                                         Entry.of("Welt", 2l),
161                                                         Entry.of("Hallo", 1l))),
162                         KeyValue.pair( // 10
163                                         KLAUS,
164                                         Ranking.of(
165                                                         Entry.of("s", 3l),
166                                                         Entry.of("Müsch", 2l))),
167         };
168
169         static MultiValueMap<User, Ranking> expectedMessages()
170         {
171                 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
172                 Stream
173                                 .of(EXPECTED_MESSAGES)
174                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
175                 return expectedMessages;
176         }
177
178         static String parseHeader(Headers headers, String key)
179         {
180                 Header header = headers.lastHeader(key);
181                 if (header == null)
182                 {
183                         return key + "=null";
184                 }
185                 else
186                 {
187                         return key + "=" + new String(header.value());
188                 }
189         }
190 }