top10: 1.4.0 - Refined output JSON -- ALIGN
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestEntry;
6 import de.juplo.kafka.wordcount.query.TestRanking;
7 import de.juplo.kafka.wordcount.query.TestStats;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
12
13 import java.util.Arrays;
14 import java.util.stream.Stream;
15
16 import static org.assertj.core.api.Assertions.assertThat;
17
18
19 class TestData
20 {
21         static final String TYPE_COUNTER = "COUNTER";
22
23         static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter");
24         static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus");
25
26         static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
27         {
28                 return Stream.of(INPUT_MESSAGES);
29         }
30
31         private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
32         {
33                         new KeyValue<>(
34                                         TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"),
35                                         TestCounter.of("Hallo",1)),
36                         new KeyValue<>(
37                                         TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
38                                         TestCounter.of("Müsch",1)),
39                         new KeyValue<>(
40                                         TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
41                                         TestCounter.of("Welt",1)),
42                         new KeyValue<>(
43                                         TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
44                                         TestCounter.of("Müsch",2)),
45                         new KeyValue<>(
46                                         TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
47                                         TestCounter.of("s",1)),
48                         new KeyValue<>(
49                                         TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
50                                         TestCounter.of("Boäh",1)),
51                         new KeyValue<>(
52                                         TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
53                                         TestCounter.of("Welt",2)),
54                         new KeyValue<>(
55                                         TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
56                                         TestCounter.of("Boäh",2)),
57                         new KeyValue<>(
58                                         TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
59                                         TestCounter.of("s",2)),
60                         new KeyValue<>(
61                                         TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
62                                         TestCounter.of("Boäh",3)),
63                         new KeyValue<>(
64                                         TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
65                                         TestCounter.of("s",3)),
66         };
67
68         static void assertExpectedMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
69         {
70                 expectedMessages().forEach(
71                                 (stats, rankings) ->
72                                                 assertThat(receivedMessages.get(stats))
73                                                                 .containsExactlyElementsOf(rankings));
74         }
75
76         static void assertExpectedState(ReadOnlyKeyValueStore<Stats, Ranking> store)
77         {
78                 assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER)));
79                 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS)));
80         }
81
82         private static Stats statsOf(TestStats stats)
83         {
84                 return Stats.of(
85                                 StatsType.valueOf(stats.getType()),
86                                 stats.getChannel());
87         }
88
89         static void assertExpectedNumberOfMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
90         {
91                 assertThat(countMessages(PETER, receivedMessages));
92                 assertThat(countMessages(KLAUS, receivedMessages));
93         }
94
95         private static int countMessages(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
96         {
97                 return messagesFor.get(stats) == null
98                                 ? 0
99                                 : messagesFor.get(stats).size();
100         }
101
102
103         static void assertExpectedLastMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
104         {
105                 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
106                 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
107         }
108
109         private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking)
110         {
111                 TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
112                 assertRankingEqualsRankingFromLastMessage(stats, testRanking);
113         }
114
115         private static TestEntry[] testEntriesOf(Entry... entries)
116         {
117                 return Arrays
118                                 .stream(entries)
119                                 .map(entry -> TestEntry.of(
120                                                 entry.getKey(),
121                                                 entry.getCounter() == null
122                                                                 ? -1l
123                                                                 : entry.getCounter()))
124                                 .toArray(size -> new TestEntry[size]);
125         }
126
127         private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking)
128         {
129                 assertThat(ranking).isEqualTo(getLastMessageFor(stats));
130         }
131
132         private static TestRanking getLastMessageFor(TestStats stats)
133         {
134                 return getLastMessageFor(stats, expectedMessages());
135         }
136
137         private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
138         {
139                 return messagesFor
140                                 .get(stats)
141                                 .stream()
142                                 .reduce(null, (left, right) -> right);
143         }
144
145         private static KeyValue<TestStats, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
146         {
147                         KeyValue.pair( // 0
148                                         PETER,
149                                         TestRanking.of(
150                                                         TestEntry.of("Hallo", 1l))),
151                         KeyValue.pair( // 1
152                                         KLAUS,
153                                         TestRanking.of(
154                                                         TestEntry.of("Müsch", 1l))),
155                         KeyValue.pair( // 2
156                                         PETER,
157                                         TestRanking.of(
158                                                         TestEntry.of("Hallo", 1l),
159                                                         TestEntry.of("Welt", 1l))),
160                         KeyValue.pair( // 3
161                                         KLAUS,
162                                         TestRanking.of(
163                                                         TestEntry.of("Müsch", 2l))),
164                         KeyValue.pair( // 4
165                                         KLAUS,
166                                         TestRanking.of(
167                                                         TestEntry.of("Müsch", 2l),
168                                                         TestEntry.of("s", 1l))),
169                         KeyValue.pair( // 5
170                                         PETER,
171                                         TestRanking.of(
172                                                         TestEntry.of("Hallo", 1l),
173                                                         TestEntry.of("Welt", 1l),
174                                                         TestEntry.of("Boäh", 1l))),
175                         KeyValue.pair( // 6
176                                         PETER,
177                                         TestRanking.of(
178                                                         TestEntry.of("Welt", 2l),
179                                                         TestEntry.of("Hallo", 1l),
180                                                         TestEntry.of("Boäh", 1l))),
181                         KeyValue.pair( // 7
182                                         PETER,
183                                         TestRanking.of(
184                                                         TestEntry.of("Welt", 2l),
185                                                         TestEntry.of("Boäh", 2l),
186                                                         TestEntry.of("Hallo", 1l))),
187                         KeyValue.pair( // 8
188                                         KLAUS,
189                                         TestRanking.of(
190                                                         TestEntry.of("Müsch", 2l),
191                                                         TestEntry.of("s", 2l))),
192                         KeyValue.pair( // 9
193                                         PETER,
194                                         TestRanking.of(
195                                                         TestEntry.of("Boäh", 3l),
196                                                         TestEntry.of("Welt", 2l),
197                                                         TestEntry.of("Hallo", 1l))),
198                         KeyValue.pair( // 10
199                                         KLAUS,
200                                         TestRanking.of(
201                                                         TestEntry.of("s", 3l),
202                                                         TestEntry.of("Müsch", 2l))),
203         };
204
205         private static MultiValueMap<TestStats, TestRanking> expectedMessages()
206         {
207                 MultiValueMap<TestStats, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
208                 Stream
209                                 .of(EXPECTED_MESSAGES)
210                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
211                 return expectedMessages;
212         }
213 }