1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestEntry;
6 import de.juplo.kafka.wordcount.query.TestRanking;
7 import de.juplo.kafka.wordcount.query.TestUser;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.util.Arrays;
14 import java.util.stream.Stream;
16 import static org.assertj.core.api.Assertions.assertThat;
21 static final TestUser PETER = TestUser.of("peter");
22 static final TestUser KLAUS = TestUser.of("klaus");
24 static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
26 return Stream.of(INPUT_MESSAGES);
29 private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
32 TestWord.of(PETER.getUser(),"Hallo"),
33 TestCounter.of(PETER.getUser(),"Hallo",1)),
35 TestWord.of(KLAUS.getUser(),"Müsch"),
36 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
38 TestWord.of(PETER.getUser(),"Welt"),
39 TestCounter.of(PETER.getUser(),"Welt",1)),
41 TestWord.of(KLAUS.getUser(),"Müsch"),
42 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
44 TestWord.of(KLAUS.getUser(),"s"),
45 TestCounter.of(KLAUS.getUser(),"s",1)),
47 TestWord.of(PETER.getUser(),"Boäh"),
48 TestCounter.of(PETER.getUser(),"Boäh",1)),
50 TestWord.of(PETER.getUser(),"Welt"),
51 TestCounter.of(PETER.getUser(),"Welt",2)),
53 TestWord.of(PETER.getUser(),"Boäh"),
54 TestCounter.of(PETER.getUser(),"Boäh",2)),
56 TestWord.of(KLAUS.getUser(),"s"),
57 TestCounter.of(KLAUS.getUser(),"s",2)),
59 TestWord.of(PETER.getUser(),"Boäh"),
60 TestCounter.of(PETER.getUser(),"Boäh",3)),
62 TestWord.of(KLAUS.getUser(),"s"),
63 TestCounter.of(KLAUS.getUser(),"s",3)),
66 static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
68 expectedMessages().forEach(
70 assertThat(receivedMessages.get(user))
71 .containsExactlyElementsOf(rankings));
74 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
76 assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
77 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
80 private static User userOf(TestUser user)
82 return User.of(user.getUser());
85 static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
87 assertThat(countMessagesForUser(PETER, receivedMessages));
88 assertThat(countMessagesForUser(KLAUS, receivedMessages));
91 private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
93 return messagesForUsers.get(user) == null
95 : messagesForUsers.get(user).size();
99 static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
101 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
102 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
105 private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
107 TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
108 assertRankingEqualsRankingFromLastMessage(user, testRanking);
111 private static TestEntry[] testEntriesOf(Entry... entries)
115 .map(entry -> TestEntry.of(
117 entry.getCounter() == null
119 : entry.getCounter()))
120 .toArray(size -> new TestEntry[size]);
123 private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
125 assertThat(ranking).isEqualTo(getLastMessageFor(user));
128 private static TestRanking getLastMessageFor(TestUser user)
130 return getLastMessageFor(user, expectedMessages());
133 private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
135 return messagesForUsers
138 .reduce(null, (left, right) -> right);
141 private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
146 TestEntry.of("Hallo", 1l))),
150 TestEntry.of("Müsch", 1l))),
154 TestEntry.of("Hallo", 1l),
155 TestEntry.of("Welt", 1l))),
159 TestEntry.of("Müsch", 2l))),
163 TestEntry.of("Müsch", 2l),
164 TestEntry.of("s", 1l))),
168 TestEntry.of("Hallo", 1l),
169 TestEntry.of("Welt", 1l),
170 TestEntry.of("Boäh", 1l))),
174 TestEntry.of("Welt", 2l),
175 TestEntry.of("Hallo", 1l),
176 TestEntry.of("Boäh", 1l))),
180 TestEntry.of("Welt", 2l),
181 TestEntry.of("Boäh", 2l),
182 TestEntry.of("Hallo", 1l))),
186 TestEntry.of("Müsch", 2l),
187 TestEntry.of("s", 2l))),
191 TestEntry.of("Boäh", 3l),
192 TestEntry.of("Welt", 2l),
193 TestEntry.of("Hallo", 1l))),
197 TestEntry.of("s", 3l),
198 TestEntry.of("Müsch", 2l))),
201 private static MultiValueMap<TestUser, TestRanking> expectedMessages()
203 MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
205 .of(EXPECTED_MESSAGES)
206 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
207 return expectedMessages;