1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
17 import static org.assertj.core.api.Assertions.assertThat;
22 static final User PETER = User.of("peter");
23 static final User KLAUS = User.of("klaus");
25 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
28 TestWord.of(PETER.getUser(),"Hallo"),
29 TestCounter.of(PETER.getUser(),"Hallo",1)),
31 TestWord.of(KLAUS.getUser(),"Müsch"),
32 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
34 TestWord.of(PETER.getUser(),"Welt"),
35 TestCounter.of(PETER.getUser(),"Welt",1)),
37 TestWord.of(KLAUS.getUser(),"Müsch"),
38 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
40 TestWord.of(KLAUS.getUser(),"s"),
41 TestCounter.of(KLAUS.getUser(),"s",1)),
43 TestWord.of(PETER.getUser(),"Boäh"),
44 TestCounter.of(PETER.getUser(),"Boäh",1)),
46 TestWord.of(PETER.getUser(),"Welt"),
47 TestCounter.of(PETER.getUser(),"Welt",2)),
49 TestWord.of(PETER.getUser(),"Boäh"),
50 TestCounter.of(PETER.getUser(),"Boäh",2)),
52 TestWord.of(KLAUS.getUser(),"s"),
53 TestCounter.of(KLAUS.getUser(),"s",2)),
55 TestWord.of(PETER.getUser(),"Boäh"),
56 TestCounter.of(PETER.getUser(),"Boäh",3)),
58 TestWord.of(KLAUS.getUser(),"s"),
59 TestCounter.of(KLAUS.getUser(),"s",3)),
62 static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
64 expectedMessages().forEach(
66 assertThat(receivedMessages.get(user))
67 .containsExactlyElementsOf(rankings));
70 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
72 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
73 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
76 static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
78 assertThat(countMessagesForUser(PETER, receivedMessages));
79 assertThat(countMessagesForUser(KLAUS, receivedMessages));
82 static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
84 return messagesForUsers.get(user).size();
88 static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
90 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
91 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
94 static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
96 assertThat(ranking).isEqualTo(getLastMessageFor(user));
99 static Ranking getLastMessageFor(User user)
101 return getLastMessageFor(user, expectedMessages());
104 static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
106 return messagesForUsers
109 .reduce(null, (left, right) -> right);
112 static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
117 Entry.of("Hallo", 1l))),
121 Entry.of("Müsch", 1l))),
125 Entry.of("Hallo", 1l),
126 Entry.of("Welt", 1l))),
130 Entry.of("Müsch", 2l))),
134 Entry.of("Müsch", 2l),
139 Entry.of("Hallo", 1l),
140 Entry.of("Welt", 1l),
141 Entry.of("Boäh", 1l))),
145 Entry.of("Welt", 2l),
146 Entry.of("Hallo", 1l),
147 Entry.of("Boäh", 1l))),
151 Entry.of("Welt", 2l),
152 Entry.of("Boäh", 2l),
153 Entry.of("Hallo", 1l))),
157 Entry.of("Müsch", 2l),
162 Entry.of("Boäh", 3l),
163 Entry.of("Welt", 2l),
164 Entry.of("Hallo", 1l))),
169 Entry.of("Müsch", 2l))),
172 static MultiValueMap<User, Ranking> expectedMessages()
174 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
176 .of(EXPECTED_MESSAGES)
177 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
178 return expectedMessages;
181 static Map<String, Object> convertToMap(Properties properties)
188 entry -> (String)entry.getKey(),
189 entry -> entry.getValue()
193 static String parseHeader(Headers headers, String key)
195 Header header = headers.lastHeader(key);
198 return key + "=null";
202 return key + "=" + new String(header.value());