008cdde8dfac113749e29ad3733ce4a628637b3d
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
16
17 import static org.assertj.core.api.Assertions.assertThat;
18
19
20 class TestData
21 {
22         static final User PETER = User.of("peter");
23         static final User KLAUS = User.of("klaus");
24
25         static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
26         {
27                         new KeyValue<>(
28                                         TestWord.of(PETER.getUser(),"Hallo"),
29                                         TestCounter.of(PETER.getUser(),"Hallo",1)),
30                         new KeyValue<>(
31                                         TestWord.of(KLAUS.getUser(),"Müsch"),
32                                         TestCounter.of(KLAUS.getUser(),"Müsch",1)),
33                         new KeyValue<>(
34                                         TestWord.of(PETER.getUser(),"Welt"),
35                                         TestCounter.of(PETER.getUser(),"Welt",1)),
36                         new KeyValue<>(
37                                         TestWord.of(KLAUS.getUser(),"Müsch"),
38                                         TestCounter.of(KLAUS.getUser(),"Müsch",2)),
39                         new KeyValue<>(
40                                         TestWord.of(KLAUS.getUser(),"s"),
41                                         TestCounter.of(KLAUS.getUser(),"s",1)),
42                         new KeyValue<>(
43                                         TestWord.of(PETER.getUser(),"Boäh"),
44                                         TestCounter.of(PETER.getUser(),"Boäh",1)),
45                         new KeyValue<>(
46                                         TestWord.of(PETER.getUser(),"Welt"),
47                                         TestCounter.of(PETER.getUser(),"Welt",2)),
48                         new KeyValue<>(
49                                         TestWord.of(PETER.getUser(),"Boäh"),
50                                         TestCounter.of(PETER.getUser(),"Boäh",2)),
51                         new KeyValue<>(
52                                         TestWord.of(KLAUS.getUser(),"s"),
53                                         TestCounter.of(KLAUS.getUser(),"s",2)),
54                         new KeyValue<>(
55                                         TestWord.of(PETER.getUser(),"Boäh"),
56                                         TestCounter.of(PETER.getUser(),"Boäh",3)),
57                         new KeyValue<>(
58                                         TestWord.of(KLAUS.getUser(),"s"),
59                                         TestCounter.of(KLAUS.getUser(),"s",3)),
60         };
61
62         static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
63         {
64                 expectedMessages().forEach(
65                                 (user, rankings) ->
66                                                 assertThat(receivedMessages.get(user))
67                                                                 .containsExactlyElementsOf(rankings));
68         }
69
70         static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
71         {
72                 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
73                 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
74         }
75
76         static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
77         {
78                 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
79                 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
80         }
81
82         static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
83         {
84                 assertThat(ranking).isEqualTo(getLastMessageFor(user));
85         }
86
87         static Ranking getLastMessageFor(User user)
88         {
89                 return getLastMessageFor(user, expectedMessages());
90         }
91
92         static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
93         {
94                 return messagesForUsers
95                                 .get(user)
96                                 .stream()
97                                 .reduce(null, (left, right) -> right);
98         }
99
100         static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
101         {
102                         KeyValue.pair( // 0
103                                         PETER,
104                                         Ranking.of(
105                                                         Entry.of("Hallo", 1l))),
106                         KeyValue.pair( // 1
107                                         KLAUS,
108                                         Ranking.of(
109                                                         Entry.of("Müsch", 1l))),
110                         KeyValue.pair( // 2
111                                         PETER,
112                                         Ranking.of(
113                                                         Entry.of("Hallo", 1l),
114                                                         Entry.of("Welt", 1l))),
115                         KeyValue.pair( // 3
116                                         KLAUS,
117                                         Ranking.of(
118                                                         Entry.of("Müsch", 2l))),
119                         KeyValue.pair( // 4
120                                         KLAUS,
121                                         Ranking.of(
122                                                         Entry.of("Müsch", 2l),
123                                                         Entry.of("s", 1l))),
124                         KeyValue.pair( // 5
125                                         PETER,
126                                         Ranking.of(
127                                                         Entry.of("Hallo", 1l),
128                                                         Entry.of("Welt", 1l),
129                                                         Entry.of("Boäh", 1l))),
130                         KeyValue.pair( // 6
131                                         PETER,
132                                         Ranking.of(
133                                                         Entry.of("Welt", 2l),
134                                                         Entry.of("Hallo", 1l),
135                                                         Entry.of("Boäh", 1l))),
136                         KeyValue.pair( // 7
137                                         PETER,
138                                         Ranking.of(
139                                                         Entry.of("Welt", 2l),
140                                                         Entry.of("Boäh", 2l),
141                                                         Entry.of("Hallo", 1l))),
142                         KeyValue.pair( // 8
143                                         KLAUS,
144                                         Ranking.of(
145                                                         Entry.of("Müsch", 2l),
146                                                         Entry.of("s", 2l))),
147                         KeyValue.pair( // 9
148                                         PETER,
149                                         Ranking.of(
150                                                         Entry.of("Boäh", 3l),
151                                                         Entry.of("Welt", 2l),
152                                                         Entry.of("Hallo", 1l))),
153                         KeyValue.pair( // 10
154                                         KLAUS,
155                                         Ranking.of(
156                                                         Entry.of("s", 3l),
157                                                         Entry.of("Müsch", 2l))),
158         };
159
160         static MultiValueMap<User, Ranking> expectedMessages()
161         {
162                 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
163                 Stream
164                                 .of(EXPECTED_MESSAGES)
165                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
166                 return expectedMessages;
167         }
168
169         static Map<String, Object> convertToMap(Properties properties)
170         {
171                 return properties
172                                 .entrySet()
173                                 .stream()
174                                 .collect(
175                                                 Collectors.toMap(
176                                                                 entry -> (String)entry.getKey(),
177                                                                 entry -> entry.getValue()
178                                                 ));
179         }
180
181         static String parseHeader(Headers headers, String key)
182         {
183                 Header header = headers.lastHeader(key);
184                 if (header == null)
185                 {
186                         return key + "=null";
187                 }
188                 else
189                 {
190                         return key + "=" + new String(header.value());
191                 }
192         }
193 }