1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
12 import java.util.stream.Stream;
14 import static org.assertj.core.api.Assertions.assertThat;
19 static final User PETER = User.of("peter");
20 static final User KLAUS = User.of("klaus");
22 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
25 TestWord.of(PETER.getUser(),"Hallo"),
26 TestCounter.of(PETER.getUser(),"Hallo",1)),
28 TestWord.of(KLAUS.getUser(),"Müsch"),
29 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
31 TestWord.of(PETER.getUser(),"Welt"),
32 TestCounter.of(PETER.getUser(),"Welt",1)),
34 TestWord.of(KLAUS.getUser(),"Müsch"),
35 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
37 TestWord.of(KLAUS.getUser(),"s"),
38 TestCounter.of(KLAUS.getUser(),"s",1)),
40 TestWord.of(PETER.getUser(),"Boäh"),
41 TestCounter.of(PETER.getUser(),"Boäh",1)),
43 TestWord.of(PETER.getUser(),"Welt"),
44 TestCounter.of(PETER.getUser(),"Welt",2)),
46 TestWord.of(PETER.getUser(),"Boäh"),
47 TestCounter.of(PETER.getUser(),"Boäh",2)),
49 TestWord.of(KLAUS.getUser(),"s"),
50 TestCounter.of(KLAUS.getUser(),"s",2)),
52 TestWord.of(PETER.getUser(),"Boäh"),
53 TestCounter.of(PETER.getUser(),"Boäh",3)),
55 TestWord.of(KLAUS.getUser(),"s"),
56 TestCounter.of(KLAUS.getUser(),"s",3)),
59 static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
61 expectedMessages().forEach(
63 assertThat(receivedMessages.get(user))
64 .containsExactlyElementsOf(rankings));
67 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
69 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
70 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
73 static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
75 assertThat(countMessagesForUser(PETER, receivedMessages));
76 assertThat(countMessagesForUser(KLAUS, receivedMessages));
79 static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
81 return messagesForUsers.get(user).size();
85 static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
87 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
88 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
91 static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
93 assertThat(ranking).isEqualTo(getLastMessageFor(user));
96 static Ranking getLastMessageFor(User user)
98 return getLastMessageFor(user, expectedMessages());
101 static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
103 return messagesForUsers
106 .reduce(null, (left, right) -> right);
109 static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
114 Entry.of("Hallo", 1l))),
118 Entry.of("Müsch", 1l))),
122 Entry.of("Hallo", 1l),
123 Entry.of("Welt", 1l))),
127 Entry.of("Müsch", 2l))),
131 Entry.of("Müsch", 2l),
136 Entry.of("Hallo", 1l),
137 Entry.of("Welt", 1l),
138 Entry.of("Boäh", 1l))),
142 Entry.of("Welt", 2l),
143 Entry.of("Hallo", 1l),
144 Entry.of("Boäh", 1l))),
148 Entry.of("Welt", 2l),
149 Entry.of("Boäh", 2l),
150 Entry.of("Hallo", 1l))),
154 Entry.of("Müsch", 2l),
159 Entry.of("Boäh", 3l),
160 Entry.of("Welt", 2l),
161 Entry.of("Hallo", 1l))),
166 Entry.of("Müsch", 2l))),
169 static MultiValueMap<User, Ranking> expectedMessages()
171 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
173 .of(EXPECTED_MESSAGES)
174 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
175 return expectedMessages;
178 static String parseHeader(Headers headers, String key)
180 Header header = headers.lastHeader(key);
183 return key + "=null";
187 return key + "=" + new String(header.value());