eb302982accae749b447e74b9a54a4777be8c5df
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
16
17 import static org.assertj.core.api.Assertions.assertThat;
18
19
20 class TestData
21 {
22         static final User PETER = User.of("peter");
23         static final User KLAUS = User.of("klaus");
24
25         static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
26         {
27                         new KeyValue<>(
28                                         TestWord.of(PETER.getUser(),"Hallo"),
29                                         TestCounter.of(PETER.getUser(),"Hallo",1)),
30                         new KeyValue<>(
31                                         TestWord.of(KLAUS.getUser(),"Müsch"),
32                                         TestCounter.of(KLAUS.getUser(),"Müsch",1)),
33                         new KeyValue<>(
34                                         TestWord.of(PETER.getUser(),"Welt"),
35                                         TestCounter.of(PETER.getUser(),"Welt",1)),
36                         new KeyValue<>(
37                                         TestWord.of(KLAUS.getUser(),"Müsch"),
38                                         TestCounter.of(KLAUS.getUser(),"Müsch",2)),
39                         new KeyValue<>(
40                                         TestWord.of(KLAUS.getUser(),"s"),
41                                         TestCounter.of(KLAUS.getUser(),"s",1)),
42                         new KeyValue<>(
43                                         TestWord.of(PETER.getUser(),"Boäh"),
44                                         TestCounter.of(PETER.getUser(),"Boäh",1)),
45                         new KeyValue<>(
46                                         TestWord.of(PETER.getUser(),"Welt"),
47                                         TestCounter.of(PETER.getUser(),"Welt",2)),
48                         new KeyValue<>(
49                                         TestWord.of(PETER.getUser(),"Boäh"),
50                                         TestCounter.of(PETER.getUser(),"Boäh",2)),
51                         new KeyValue<>(
52                                         TestWord.of(KLAUS.getUser(),"s"),
53                                         TestCounter.of(KLAUS.getUser(),"s",2)),
54                         new KeyValue<>(
55                                         TestWord.of(PETER.getUser(),"Boäh"),
56                                         TestCounter.of(PETER.getUser(),"Boäh",3)),
57                         new KeyValue<>(
58                                         TestWord.of(KLAUS.getUser(),"s"),
59                                         TestCounter.of(KLAUS.getUser(),"s",3)),
60         };
61
62         static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
63         {
64                 expectedMessages().forEach(
65                                 (user, rankings) ->
66                                                 assertThat(receivedMessages.get(user))
67                                                                 .containsExactlyElementsOf(rankings));
68         }
69
70         static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
71         {
72                 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
73                 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
74         }
75
76         static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
77         {
78                 assertThat(ranking).isEqualTo(getLastMessageFor(user));
79         }
80
81         static Ranking getLastMessageFor(User user)
82         {
83                 return getLastMessageFor(user, expectedMessages());
84         }
85
86         static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
87         {
88                 return messagesForUsers
89                                 .get(user)
90                                 .stream()
91                                 .reduce(null, (left, right) -> right);
92         }
93
94         static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
95         {
96                         KeyValue.pair( // 0
97                                         PETER,
98                                         Ranking.of(
99                                                         Entry.of("Hallo", 1l))),
100                         KeyValue.pair( // 1
101                                         KLAUS,
102                                         Ranking.of(
103                                                         Entry.of("Müsch", 1l))),
104                         KeyValue.pair( // 2
105                                         PETER,
106                                         Ranking.of(
107                                                         Entry.of("Hallo", 1l),
108                                                         Entry.of("Welt", 1l))),
109                         KeyValue.pair( // 3
110                                         KLAUS,
111                                         Ranking.of(
112                                                         Entry.of("Müsch", 2l))),
113                         KeyValue.pair( // 4
114                                         KLAUS,
115                                         Ranking.of(
116                                                         Entry.of("Müsch", 2l),
117                                                         Entry.of("s", 1l))),
118                         KeyValue.pair( // 5
119                                         PETER,
120                                         Ranking.of(
121                                                         Entry.of("Hallo", 1l),
122                                                         Entry.of("Welt", 1l),
123                                                         Entry.of("Boäh", 1l))),
124                         KeyValue.pair( // 6
125                                         PETER,
126                                         Ranking.of(
127                                                         Entry.of("Welt", 2l),
128                                                         Entry.of("Hallo", 1l),
129                                                         Entry.of("Boäh", 1l))),
130                         KeyValue.pair( // 7
131                                         PETER,
132                                         Ranking.of(
133                                                         Entry.of("Welt", 2l),
134                                                         Entry.of("Boäh", 2l),
135                                                         Entry.of("Hallo", 1l))),
136                         KeyValue.pair( // 8
137                                         KLAUS,
138                                         Ranking.of(
139                                                         Entry.of("Müsch", 2l),
140                                                         Entry.of("s", 2l))),
141                         KeyValue.pair( // 9
142                                         PETER,
143                                         Ranking.of(
144                                                         Entry.of("Boäh", 3l),
145                                                         Entry.of("Welt", 2l),
146                                                         Entry.of("Hallo", 1l))),
147                         KeyValue.pair( // 10
148                                         KLAUS,
149                                         Ranking.of(
150                                                         Entry.of("s", 3l),
151                                                         Entry.of("Müsch", 2l))),
152         };
153
154         static MultiValueMap<User, Ranking> expectedMessages()
155         {
156                 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
157                 Stream
158                                 .of(EXPECTED_MESSAGES)
159                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
160                 return expectedMessages;
161         }
162
163         static Map<String, Object> convertToMap(Properties properties)
164         {
165                 return properties
166                                 .entrySet()
167                                 .stream()
168                                 .collect(
169                                                 Collectors.toMap(
170                                                                 entry -> (String)entry.getKey(),
171                                                                 entry -> entry.getValue()
172                                                 ));
173         }
174
175         static String parseHeader(Headers headers, String key)
176         {
177                 Header header = headers.lastHeader(key);
178                 if (header == null)
179                 {
180                         return key + "=null";
181                 }
182                 else
183                 {
184                         return key + "=" + new String(header.value());
185                 }
186         }
187 }