1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestEntry;
6 import de.juplo.kafka.wordcount.query.TestRanking;
7 import de.juplo.kafka.wordcount.query.TestUser;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.util.Arrays;
14 import java.util.stream.Stream;
16 import static org.assertj.core.api.Assertions.assertThat;
21 static final String TYPE_COUNTER = "COUNTER";
23 static final TestUser PETER = TestUser.of("peter");
24 static final TestUser KLAUS = TestUser.of("klaus");
26 static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
28 return Stream.of(INPUT_MESSAGES);
31 private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
34 TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"),
35 TestCounter.of("Hallo",1)),
37 TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
38 TestCounter.of("Müsch",1)),
40 TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
41 TestCounter.of("Welt",1)),
43 TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
44 TestCounter.of("Müsch",2)),
46 TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
47 TestCounter.of("s",1)),
49 TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
50 TestCounter.of("Boäh",1)),
52 TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
53 TestCounter.of("Welt",2)),
55 TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
56 TestCounter.of("Boäh",2)),
58 TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
59 TestCounter.of("s",2)),
61 TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
62 TestCounter.of("Boäh",3)),
64 TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
65 TestCounter.of("s",3)),
68 static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
70 expectedMessages().forEach(
72 assertThat(receivedMessages.get(user))
73 .containsExactlyElementsOf(rankings));
76 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
78 assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
79 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
82 private static User userOf(TestUser user)
84 return User.of(user.getUser());
87 static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
89 assertThat(countMessagesForUser(PETER, receivedMessages));
90 assertThat(countMessagesForUser(KLAUS, receivedMessages));
93 private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
95 return messagesForUsers.get(user) == null
97 : messagesForUsers.get(user).size();
101 static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
103 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
104 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
107 private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
109 TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
110 assertRankingEqualsRankingFromLastMessage(user, testRanking);
113 private static TestEntry[] testEntriesOf(Entry... entries)
117 .map(entry -> TestEntry.of(
119 entry.getCounter() == null
121 : entry.getCounter()))
122 .toArray(size -> new TestEntry[size]);
125 private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
127 assertThat(ranking).isEqualTo(getLastMessageFor(user));
130 private static TestRanking getLastMessageFor(TestUser user)
132 return getLastMessageFor(user, expectedMessages());
135 private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
137 return messagesForUsers
140 .reduce(null, (left, right) -> right);
143 private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
148 TestEntry.of("Hallo", 1l))),
152 TestEntry.of("Müsch", 1l))),
156 TestEntry.of("Hallo", 1l),
157 TestEntry.of("Welt", 1l))),
161 TestEntry.of("Müsch", 2l))),
165 TestEntry.of("Müsch", 2l),
166 TestEntry.of("s", 1l))),
170 TestEntry.of("Hallo", 1l),
171 TestEntry.of("Welt", 1l),
172 TestEntry.of("Boäh", 1l))),
176 TestEntry.of("Welt", 2l),
177 TestEntry.of("Hallo", 1l),
178 TestEntry.of("Boäh", 1l))),
182 TestEntry.of("Welt", 2l),
183 TestEntry.of("Boäh", 2l),
184 TestEntry.of("Hallo", 1l))),
188 TestEntry.of("Müsch", 2l),
189 TestEntry.of("s", 2l))),
193 TestEntry.of("Boäh", 3l),
194 TestEntry.of("Welt", 2l),
195 TestEntry.of("Hallo", 1l))),
199 TestEntry.of("s", 3l),
200 TestEntry.of("Müsch", 2l))),
203 private static MultiValueMap<TestUser, TestRanking> expectedMessages()
205 MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
207 .of(EXPECTED_MESSAGES)
208 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
209 return expectedMessages;