1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestEntry;
6 import de.juplo.kafka.wordcount.query.TestRanking;
7 import de.juplo.kafka.wordcount.query.TestUser;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.util.Arrays;
14 import java.util.stream.Stream;
16 import static org.assertj.core.api.Assertions.assertThat;
21 static final TestUser PETER = TestUser.of("peter");
22 static final TestUser KLAUS = TestUser.of("klaus");
24 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
27 TestWord.of(PETER.getUser(),"Hallo"),
28 TestCounter.of(PETER.getUser(),"Hallo",1)),
30 TestWord.of(KLAUS.getUser(),"Müsch"),
31 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
33 TestWord.of(PETER.getUser(),"Welt"),
34 TestCounter.of(PETER.getUser(),"Welt",1)),
36 TestWord.of(KLAUS.getUser(),"Müsch"),
37 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
39 TestWord.of(KLAUS.getUser(),"s"),
40 TestCounter.of(KLAUS.getUser(),"s",1)),
42 TestWord.of(PETER.getUser(),"Boäh"),
43 TestCounter.of(PETER.getUser(),"Boäh",1)),
45 TestWord.of(PETER.getUser(),"Welt"),
46 TestCounter.of(PETER.getUser(),"Welt",2)),
48 TestWord.of(PETER.getUser(),"Boäh"),
49 TestCounter.of(PETER.getUser(),"Boäh",2)),
51 TestWord.of(KLAUS.getUser(),"s"),
52 TestCounter.of(KLAUS.getUser(),"s",2)),
54 TestWord.of(PETER.getUser(),"Boäh"),
55 TestCounter.of(PETER.getUser(),"Boäh",3)),
57 TestWord.of(KLAUS.getUser(),"s"),
58 TestCounter.of(KLAUS.getUser(),"s",3)),
61 static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
63 expectedMessages().forEach(
65 assertThat(receivedMessages.get(user))
66 .containsExactlyElementsOf(rankings));
69 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
71 assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
72 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
75 static User userOf(TestUser user)
77 return User.of(user.getUser());
80 static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
82 assertThat(countMessagesForUser(PETER, receivedMessages));
83 assertThat(countMessagesForUser(KLAUS, receivedMessages));
86 static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
88 return messagesForUsers.get(user) == null
90 : messagesForUsers.get(user).size();
94 static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
96 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
97 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
100 static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
102 TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
103 assertRankingEqualsRankingFromLastMessage(user, testRanking);
106 static TestEntry[] testEntriesOf(Entry... entries)
110 .map(entry -> TestEntry.of(
112 entry.getCounter() == null
114 : entry.getCounter()))
115 .toArray(size -> new TestEntry[size]);
118 static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
120 assertThat(ranking).isEqualTo(getLastMessageFor(user));
123 static TestRanking getLastMessageFor(TestUser user)
125 return getLastMessageFor(user, expectedMessages());
128 static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
130 return messagesForUsers
133 .reduce(null, (left, right) -> right);
136 static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
141 TestEntry.of("Hallo", 1l))),
145 TestEntry.of("Müsch", 1l))),
149 TestEntry.of("Hallo", 1l),
150 TestEntry.of("Welt", 1l))),
154 TestEntry.of("Müsch", 2l))),
158 TestEntry.of("Müsch", 2l),
159 TestEntry.of("s", 1l))),
163 TestEntry.of("Hallo", 1l),
164 TestEntry.of("Welt", 1l),
165 TestEntry.of("Boäh", 1l))),
169 TestEntry.of("Welt", 2l),
170 TestEntry.of("Hallo", 1l),
171 TestEntry.of("Boäh", 1l))),
175 TestEntry.of("Welt", 2l),
176 TestEntry.of("Boäh", 2l),
177 TestEntry.of("Hallo", 1l))),
181 TestEntry.of("Müsch", 2l),
182 TestEntry.of("s", 2l))),
186 TestEntry.of("Boäh", 3l),
187 TestEntry.of("Welt", 2l),
188 TestEntry.of("Hallo", 1l))),
192 TestEntry.of("s", 3l),
193 TestEntry.of("Müsch", 2l))),
196 static MultiValueMap<TestUser, TestRanking> expectedMessages()
198 MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
200 .of(EXPECTED_MESSAGES)
201 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
202 return expectedMessages;