1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import de.juplo.kafka.wordcount.query.TestEntry;
6 import de.juplo.kafka.wordcount.query.TestRanking;
7 import de.juplo.kafka.wordcount.query.TestUser;
8 import org.apache.kafka.streams.KeyValue;
9 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import org.springframework.util.LinkedMultiValueMap;
11 import org.springframework.util.MultiValueMap;
13 import java.util.Arrays;
14 import java.util.stream.Stream;
16 import static org.assertj.core.api.Assertions.assertThat;
21 static final TestUser PETER = TestUser.of("peter");
22 static final TestUser KLAUS = TestUser.of("klaus");
24 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
27 TestWord.of(PETER.getUser(),"Hallo"),
28 TestCounter.of(PETER.getUser(),"Hallo",1)),
30 TestWord.of(KLAUS.getUser(),"Müsch"),
31 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
33 TestWord.of(PETER.getUser(),"Welt"),
34 TestCounter.of(PETER.getUser(),"Welt",1)),
36 TestWord.of(KLAUS.getUser(),"Müsch"),
37 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
39 TestWord.of(KLAUS.getUser(),"s"),
40 TestCounter.of(KLAUS.getUser(),"s",1)),
42 TestWord.of(PETER.getUser(),"Boäh"),
43 TestCounter.of(PETER.getUser(),"Boäh",1)),
45 TestWord.of(PETER.getUser(),"Welt"),
46 TestCounter.of(PETER.getUser(),"Welt",2)),
48 TestWord.of(PETER.getUser(),"Boäh"),
49 TestCounter.of(PETER.getUser(),"Boäh",2)),
51 TestWord.of(KLAUS.getUser(),"s"),
52 TestCounter.of(KLAUS.getUser(),"s",2)),
54 TestWord.of(PETER.getUser(),"Boäh"),
55 TestCounter.of(PETER.getUser(),"Boäh",3)),
57 TestWord.of(KLAUS.getUser(),"s"),
58 TestCounter.of(KLAUS.getUser(),"s",3)),
61 static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
63 expectedMessages().forEach(
65 assertThat(receivedMessages.get(user))
66 .containsExactlyElementsOf(rankings));
69 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
71 assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
72 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
75 static User userOf(TestUser user)
77 return User.of(user.getUser());
80 static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
82 assertThat(countMessagesForUser(PETER, receivedMessages));
83 assertThat(countMessagesForUser(KLAUS, receivedMessages));
86 static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
88 return messagesForUsers.get(user).size();
92 static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
94 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
95 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
98 static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
100 TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
101 assertRankingEqualsRankingFromLastMessage(user, testRanking);
104 static TestEntry[] testEntriesOf(Entry... entries)
108 .map(entry -> TestEntry.of(
110 entry.getCounter() == null
112 : entry.getCounter()))
113 .toArray(size -> new TestEntry[size]);
116 static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
118 assertThat(ranking).isEqualTo(getLastMessageFor(user));
121 static TestRanking getLastMessageFor(TestUser user)
123 return getLastMessageFor(user, expectedMessages());
126 static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
128 return messagesForUsers
131 .reduce(null, (left, right) -> right);
134 static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
139 TestEntry.of("Hallo", 1l))),
143 TestEntry.of("Müsch", 1l))),
147 TestEntry.of("Hallo", 1l),
148 TestEntry.of("Welt", 1l))),
152 TestEntry.of("Müsch", 2l))),
156 TestEntry.of("Müsch", 2l),
157 TestEntry.of("s", 1l))),
161 TestEntry.of("Hallo", 1l),
162 TestEntry.of("Welt", 1l),
163 TestEntry.of("Boäh", 1l))),
167 TestEntry.of("Welt", 2l),
168 TestEntry.of("Hallo", 1l),
169 TestEntry.of("Boäh", 1l))),
173 TestEntry.of("Welt", 2l),
174 TestEntry.of("Boäh", 2l),
175 TestEntry.of("Hallo", 1l))),
179 TestEntry.of("Müsch", 2l),
180 TestEntry.of("s", 2l))),
184 TestEntry.of("Boäh", 3l),
185 TestEntry.of("Welt", 2l),
186 TestEntry.of("Hallo", 1l))),
190 TestEntry.of("s", 3l),
191 TestEntry.of("Müsch", 2l))),
194 static MultiValueMap<TestUser, TestRanking> expectedMessages()
196 MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
198 .of(EXPECTED_MESSAGES)
199 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
200 return expectedMessages;