1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestEntry;
6 import de.juplo.kafka.wordcount.query.TestRanking;
7 import de.juplo.kafka.wordcount.query.TestStats;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.util.Arrays;
14 import java.util.stream.Stream;
16 import static org.assertj.core.api.Assertions.assertThat;
21 static final String TYPE_COUNTER = "COUNTER";
23 static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter");
24 static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus");
26 static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
28 return Stream.of(INPUT_MESSAGES);
31 private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
34 TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"),
35 TestCounter.of("Hallo",1)),
37 TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
38 TestCounter.of("Müsch",1)),
40 TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
41 TestCounter.of("Welt",1)),
43 TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
44 TestCounter.of("Müsch",2)),
46 TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
47 TestCounter.of("s",1)),
49 TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
50 TestCounter.of("Boäh",1)),
52 TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
53 TestCounter.of("Welt",2)),
55 TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
56 TestCounter.of("Boäh",2)),
58 TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
59 TestCounter.of("s",2)),
61 TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
62 TestCounter.of("Boäh",3)),
64 TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
65 TestCounter.of("s",3)),
68 static void assertExpectedMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
70 expectedMessages().forEach(
72 assertThat(receivedMessages.get(stats))
73 .containsExactlyElementsOf(rankings));
76 static void assertExpectedState(ReadOnlyKeyValueStore<Stats, Ranking> store)
78 assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER)));
79 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS)));
82 private static Stats statsOf(TestStats stats)
85 StatsType.valueOf(stats.getType()),
89 static void assertExpectedNumberOfMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
91 assertThat(countMessages(PETER, receivedMessages));
92 assertThat(countMessages(KLAUS, receivedMessages));
95 private static int countMessages(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
97 return messagesFor.get(stats) == null
99 : messagesFor.get(stats).size();
103 static void assertExpectedLastMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
105 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
106 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
109 private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking)
111 TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
112 assertRankingEqualsRankingFromLastMessage(stats, testRanking);
115 private static TestEntry[] testEntriesOf(Entry... entries)
119 .map(entry -> TestEntry.of(
121 entry.getCounter() == null
123 : entry.getCounter()))
124 .toArray(size -> new TestEntry[size]);
127 private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking)
129 assertThat(ranking).isEqualTo(getLastMessageFor(stats));
132 private static TestRanking getLastMessageFor(TestStats stats)
134 return getLastMessageFor(stats, expectedMessages());
137 private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
142 .reduce(null, (left, right) -> right);
145 private static KeyValue<TestStats, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
150 TestEntry.of("Hallo", 1l))),
154 TestEntry.of("Müsch", 1l))),
158 TestEntry.of("Hallo", 1l),
159 TestEntry.of("Welt", 1l))),
163 TestEntry.of("Müsch", 2l))),
167 TestEntry.of("Müsch", 2l),
168 TestEntry.of("s", 1l))),
172 TestEntry.of("Hallo", 1l),
173 TestEntry.of("Welt", 1l),
174 TestEntry.of("Boäh", 1l))),
178 TestEntry.of("Welt", 2l),
179 TestEntry.of("Hallo", 1l),
180 TestEntry.of("Boäh", 1l))),
184 TestEntry.of("Welt", 2l),
185 TestEntry.of("Boäh", 2l),
186 TestEntry.of("Hallo", 1l))),
190 TestEntry.of("Müsch", 2l),
191 TestEntry.of("s", 2l))),
195 TestEntry.of("Boäh", 3l),
196 TestEntry.of("Welt", 2l),
197 TestEntry.of("Hallo", 1l))),
201 TestEntry.of("s", 3l),
202 TestEntry.of("Müsch", 2l))),
205 private static MultiValueMap<TestStats, TestRanking> expectedMessages()
207 MultiValueMap<TestStats, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
209 .of(EXPECTED_MESSAGES)
210 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
211 return expectedMessages;