a4f79ac72c882fee5a0a9e444f5f570dfe11e15c
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
16
17 import static org.assertj.core.api.Assertions.assertThat;
18
19
20 class TestData
21 {
22         static final User PETER = User.of("peter");
23         static final User KLAUS = User.of("klaus");
24
25         static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
26         {
27                         new KeyValue<>(
28                                         TestWord.of(PETER.getUser(),"Hallo"),
29                                         TestCounter.of(PETER.getUser(),"Hallo",1)),
30                         new KeyValue<>(
31                                         TestWord.of(KLAUS.getUser(),"Müsch"),
32                                         TestCounter.of(KLAUS.getUser(),"Müsch",1)),
33                         new KeyValue<>(
34                                         TestWord.of(PETER.getUser(),"Welt"),
35                                         TestCounter.of(PETER.getUser(),"Welt",1)),
36                         new KeyValue<>(
37                                         TestWord.of(KLAUS.getUser(),"Müsch"),
38                                         TestCounter.of(KLAUS.getUser(),"Müsch",2)),
39                         new KeyValue<>(
40                                         TestWord.of(KLAUS.getUser(),"s"),
41                                         TestCounter.of(KLAUS.getUser(),"s",1)),
42                         new KeyValue<>(
43                                         TestWord.of(PETER.getUser(),"Boäh"),
44                                         TestCounter.of(PETER.getUser(),"Boäh",1)),
45                         new KeyValue<>(
46                                         TestWord.of(PETER.getUser(),"Welt"),
47                                         TestCounter.of(PETER.getUser(),"Welt",2)),
48                         new KeyValue<>(
49                                         TestWord.of(PETER.getUser(),"Boäh"),
50                                         TestCounter.of(PETER.getUser(),"Boäh",2)),
51                         new KeyValue<>(
52                                         TestWord.of(KLAUS.getUser(),"s"),
53                                         TestCounter.of(KLAUS.getUser(),"s",2)),
54                         new KeyValue<>(
55                                         TestWord.of(PETER.getUser(),"Boäh"),
56                                         TestCounter.of(PETER.getUser(),"Boäh",3)),
57                         new KeyValue<>(
58                                         TestWord.of(KLAUS.getUser(),"s"),
59                                         TestCounter.of(KLAUS.getUser(),"s",3)),
60         };
61
62         static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
63         {
64                 expectedMessages().forEach(
65                                 (user, rankings) ->
66                                                 assertThat(receivedMessages.get(user))
67                                                                 .containsExactlyElementsOf(rankings));
68         }
69
70         static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
71         {
72                 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
73                 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
74         }
75
76         static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
77         {
78                 assertThat(countMessagesForUser(PETER, receivedMessages));
79                 assertThat(countMessagesForUser(KLAUS, receivedMessages));
80         }
81
82         static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
83         {
84                 return messagesForUsers.get(user).size();
85         }
86
87
88         static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
89         {
90                 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
91                 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
92         }
93
94         static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
95         {
96                 assertThat(ranking).isEqualTo(getLastMessageFor(user));
97         }
98
99         static Ranking getLastMessageFor(User user)
100         {
101                 return getLastMessageFor(user, expectedMessages());
102         }
103
104         static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
105         {
106                 return messagesForUsers
107                                 .get(user)
108                                 .stream()
109                                 .reduce(null, (left, right) -> right);
110         }
111
112         static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
113         {
114                         KeyValue.pair( // 0
115                                         PETER,
116                                         Ranking.of(
117                                                         Entry.of("Hallo", 1l))),
118                         KeyValue.pair( // 1
119                                         KLAUS,
120                                         Ranking.of(
121                                                         Entry.of("Müsch", 1l))),
122                         KeyValue.pair( // 2
123                                         PETER,
124                                         Ranking.of(
125                                                         Entry.of("Hallo", 1l),
126                                                         Entry.of("Welt", 1l))),
127                         KeyValue.pair( // 3
128                                         KLAUS,
129                                         Ranking.of(
130                                                         Entry.of("Müsch", 2l))),
131                         KeyValue.pair( // 4
132                                         KLAUS,
133                                         Ranking.of(
134                                                         Entry.of("Müsch", 2l),
135                                                         Entry.of("s", 1l))),
136                         KeyValue.pair( // 5
137                                         PETER,
138                                         Ranking.of(
139                                                         Entry.of("Hallo", 1l),
140                                                         Entry.of("Welt", 1l),
141                                                         Entry.of("Boäh", 1l))),
142                         KeyValue.pair( // 6
143                                         PETER,
144                                         Ranking.of(
145                                                         Entry.of("Welt", 2l),
146                                                         Entry.of("Hallo", 1l),
147                                                         Entry.of("Boäh", 1l))),
148                         KeyValue.pair( // 7
149                                         PETER,
150                                         Ranking.of(
151                                                         Entry.of("Welt", 2l),
152                                                         Entry.of("Boäh", 2l),
153                                                         Entry.of("Hallo", 1l))),
154                         KeyValue.pair( // 8
155                                         KLAUS,
156                                         Ranking.of(
157                                                         Entry.of("Müsch", 2l),
158                                                         Entry.of("s", 2l))),
159                         KeyValue.pair( // 9
160                                         PETER,
161                                         Ranking.of(
162                                                         Entry.of("Boäh", 3l),
163                                                         Entry.of("Welt", 2l),
164                                                         Entry.of("Hallo", 1l))),
165                         KeyValue.pair( // 10
166                                         KLAUS,
167                                         Ranking.of(
168                                                         Entry.of("s", 3l),
169                                                         Entry.of("Müsch", 2l))),
170         };
171
172         static MultiValueMap<User, Ranking> expectedMessages()
173         {
174                 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
175                 Stream
176                                 .of(EXPECTED_MESSAGES)
177                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
178                 return expectedMessages;
179         }
180
181         static Map<String, Object> convertToMap(Properties properties)
182         {
183                 return properties
184                                 .entrySet()
185                                 .stream()
186                                 .collect(
187                                                 Collectors.toMap(
188                                                                 entry -> (String)entry.getKey(),
189                                                                 entry -> entry.getValue()
190                                                 ));
191         }
192
193         static String parseHeader(Headers headers, String key)
194         {
195                 Header header = headers.lastHeader(key);
196                 if (header == null)
197                 {
198                         return key + "=null";
199                 }
200                 else
201                 {
202                         return key + "=" + new String(header.value());
203                 }
204         }
205 }