top10: 1.2.1 - (RED) Added an assertion regarding the expected state
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
11
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
16
17 import static org.assertj.core.api.Assertions.assertThat;
18
19
20 class TestData
21 {
22         static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
23         {
24                         new KeyValue<>(
25                                         TestWord.of("peter","Hallo"),
26                                         TestCounter.of("peter","Hallo",1)),
27                         new KeyValue<>(
28                                         TestWord.of("klaus","Müsch"),
29                                         TestCounter.of("klaus","Müsch",1)),
30                         new KeyValue<>(
31                                         TestWord.of("peter","Welt"),
32                                         TestCounter.of("peter","Welt",1)),
33                         new KeyValue<>(
34                                         TestWord.of("klaus","Müsch"),
35                                         TestCounter.of("klaus","Müsch",2)),
36                         new KeyValue<>(
37                                         TestWord.of("klaus","s"),
38                                         TestCounter.of("klaus","s",1)),
39                         new KeyValue<>(
40                                         TestWord.of("peter","Boäh"),
41                                         TestCounter.of("peter","Boäh",1)),
42                         new KeyValue<>(
43                                         TestWord.of("peter","Welt"),
44                                         TestCounter.of("peter","Welt",2)),
45                         new KeyValue<>(
46                                         TestWord.of("peter","Boäh"),
47                                         TestCounter.of("peter","Boäh",2)),
48                         new KeyValue<>(
49                                         TestWord.of("klaus","s"),
50                                         TestCounter.of("klaus","s",2)),
51                         new KeyValue<>(
52                                         TestWord.of("peter","Boäh"),
53                                         TestCounter.of("peter","Boäh",3)),
54                         new KeyValue<>(
55                                         TestWord.of("klaus","s"),
56                                         TestCounter.of("klaus","s",3)),
57         };
58
59         static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
60         {
61                 expectedMessages().forEach(
62                                 (user, rankings) ->
63                                                 assertThat(receivedMessages.get(user))
64                                                                 .containsExactlyElementsOf(rankings));
65         }
66
67         static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
68         {
69                 assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
70                 assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
71         }
72
73         static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
74         {
75                         KeyValue.pair( // 0
76                                         User.of("peter"),
77                                         Ranking.of(
78                                                         Entry.of("Hallo", 1l))),
79                         KeyValue.pair( // 1
80                                         User.of("klaus"),
81                                         Ranking.of(
82                                                         Entry.of("Müsch", 1l))),
83                         KeyValue.pair( // 2
84                                         User.of("peter"),
85                                         Ranking.of(
86                                                         Entry.of("Hallo", 1l),
87                                                         Entry.of("Welt", 1l))),
88                         KeyValue.pair( // 3
89                                         User.of("klaus"),
90                                         Ranking.of(
91                                                         Entry.of("Müsch", 2l))),
92                         KeyValue.pair( // 4
93                                         User.of("klaus"),
94                                         Ranking.of(
95                                                         Entry.of("Müsch", 2l),
96                                                         Entry.of("s", 1l))),
97                         KeyValue.pair( // 5
98                                         User.of("peter"),
99                                         Ranking.of(
100                                                         Entry.of("Hallo", 1l),
101                                                         Entry.of("Welt", 1l),
102                                                         Entry.of("Boäh", 1l))),
103                         KeyValue.pair( // 6
104                                         User.of("peter"),
105                                         Ranking.of(
106                                                         Entry.of("Welt", 2l),
107                                                         Entry.of("Hallo", 1l),
108                                                         Entry.of("Boäh", 1l))),
109                         KeyValue.pair( // 7
110                                         User.of("peter"),
111                                         Ranking.of(
112                                                         Entry.of("Welt", 2l),
113                                                         Entry.of("Boäh", 2l),
114                                                         Entry.of("Hallo", 1l))),
115                         KeyValue.pair( // 8
116                                         User.of("klaus"),
117                                         Ranking.of(
118                                                         Entry.of("Müsch", 2l),
119                                                         Entry.of("s", 2l))),
120                         KeyValue.pair( // 9
121                                         User.of("peter"),
122                                         Ranking.of(
123                                                         Entry.of("Boäh", 3l),
124                                                         Entry.of("Welt", 2l),
125                                                         Entry.of("Hallo", 1l))),
126                         KeyValue.pair( // 10
127                                         User.of("klaus"),
128                                         Ranking.of(
129                                                         Entry.of("s", 3l),
130                                                         Entry.of("Müsch", 2l))),
131         };
132
133         static MultiValueMap<User, Ranking> expectedMessages()
134         {
135                 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
136                 Stream
137                                 .of(EXPECTED_MESSAGES)
138                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
139                 return expectedMessages;
140         }
141
142         static Map<String, Object> convertToMap(Properties properties)
143         {
144                 return properties
145                                 .entrySet()
146                                 .stream()
147                                 .collect(
148                                                 Collectors.toMap(
149                                                                 entry -> (String)entry.getKey(),
150                                                                 entry -> entry.getValue()
151                                                 ));
152         }
153
154         static String parseHeader(Headers headers, String key)
155         {
156                 Header header = headers.lastHeader(key);
157                 if (header == null)
158                 {
159                         return key + "=null";
160                 }
161                 else
162                 {
163                         return key + "=" + new String(header.value());
164                 }
165         }
166 }