1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.streams.KeyValue;
6 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
7 import org.springframework.util.LinkedMultiValueMap;
8 import org.springframework.util.MultiValueMap;
10 import java.util.stream.Stream;
12 import static org.assertj.core.api.Assertions.assertThat;
17 static final User PETER = User.of("peter");
18 static final User KLAUS = User.of("klaus");
20 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
23 TestWord.of(PETER.getUser(),"Hallo"),
24 TestCounter.of(PETER.getUser(),"Hallo",1)),
26 TestWord.of(KLAUS.getUser(),"Müsch"),
27 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
29 TestWord.of(PETER.getUser(),"Welt"),
30 TestCounter.of(PETER.getUser(),"Welt",1)),
32 TestWord.of(KLAUS.getUser(),"Müsch"),
33 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
35 TestWord.of(KLAUS.getUser(),"s"),
36 TestCounter.of(KLAUS.getUser(),"s",1)),
38 TestWord.of(PETER.getUser(),"Boäh"),
39 TestCounter.of(PETER.getUser(),"Boäh",1)),
41 TestWord.of(PETER.getUser(),"Welt"),
42 TestCounter.of(PETER.getUser(),"Welt",2)),
44 TestWord.of(PETER.getUser(),"Boäh"),
45 TestCounter.of(PETER.getUser(),"Boäh",2)),
47 TestWord.of(KLAUS.getUser(),"s"),
48 TestCounter.of(KLAUS.getUser(),"s",2)),
50 TestWord.of(PETER.getUser(),"Boäh"),
51 TestCounter.of(PETER.getUser(),"Boäh",3)),
53 TestWord.of(KLAUS.getUser(),"s"),
54 TestCounter.of(KLAUS.getUser(),"s",3)),
57 static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
59 expectedMessages().forEach(
61 assertThat(receivedMessages.get(user))
62 .containsExactlyElementsOf(rankings));
65 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
67 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
68 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
71 static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
73 assertThat(countMessagesForUser(PETER, receivedMessages));
74 assertThat(countMessagesForUser(KLAUS, receivedMessages));
77 static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
79 return messagesForUsers.get(user).size();
83 static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
85 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
86 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
89 static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
91 assertThat(ranking).isEqualTo(getLastMessageFor(user));
94 static Ranking getLastMessageFor(User user)
96 return getLastMessageFor(user, expectedMessages());
99 static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
101 return messagesForUsers
104 .reduce(null, (left, right) -> right);
107 static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
112 Entry.of("Hallo", 1l))),
116 Entry.of("Müsch", 1l))),
120 Entry.of("Hallo", 1l),
121 Entry.of("Welt", 1l))),
125 Entry.of("Müsch", 2l))),
129 Entry.of("Müsch", 2l),
134 Entry.of("Hallo", 1l),
135 Entry.of("Welt", 1l),
136 Entry.of("Boäh", 1l))),
140 Entry.of("Welt", 2l),
141 Entry.of("Hallo", 1l),
142 Entry.of("Boäh", 1l))),
146 Entry.of("Welt", 2l),
147 Entry.of("Boäh", 2l),
148 Entry.of("Hallo", 1l))),
152 Entry.of("Müsch", 2l),
157 Entry.of("Boäh", 3l),
158 Entry.of("Welt", 2l),
159 Entry.of("Hallo", 1l))),
164 Entry.of("Müsch", 2l))),
167 static MultiValueMap<User, Ranking> expectedMessages()
169 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
171 .of(EXPECTED_MESSAGES)
172 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
173 return expectedMessages;