1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
17 import static org.assertj.core.api.Assertions.assertThat;
22 static final User PETER = User.of("peter");
23 static final User KLAUS = User.of("klaus");
25 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
28 TestWord.of(PETER.getUser(),"Hallo"),
29 TestCounter.of(PETER.getUser(),"Hallo",1)),
31 TestWord.of(KLAUS.getUser(),"Müsch"),
32 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
34 TestWord.of(PETER.getUser(),"Welt"),
35 TestCounter.of(PETER.getUser(),"Welt",1)),
37 TestWord.of(KLAUS.getUser(),"Müsch"),
38 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
40 TestWord.of(KLAUS.getUser(),"s"),
41 TestCounter.of(KLAUS.getUser(),"s",1)),
43 TestWord.of(PETER.getUser(),"Boäh"),
44 TestCounter.of(PETER.getUser(),"Boäh",1)),
46 TestWord.of(PETER.getUser(),"Welt"),
47 TestCounter.of(PETER.getUser(),"Welt",2)),
49 TestWord.of(PETER.getUser(),"Boäh"),
50 TestCounter.of(PETER.getUser(),"Boäh",2)),
52 TestWord.of(KLAUS.getUser(),"s"),
53 TestCounter.of(KLAUS.getUser(),"s",2)),
55 TestWord.of(PETER.getUser(),"Boäh"),
56 TestCounter.of(PETER.getUser(),"Boäh",3)),
58 TestWord.of(KLAUS.getUser(),"s"),
59 TestCounter.of(KLAUS.getUser(),"s",3)),
62 static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
64 expectedMessages().forEach(
66 assertThat(receivedMessages.get(user))
67 .containsExactlyElementsOf(rankings));
70 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
72 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
73 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
76 static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
78 assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
79 assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
82 static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
84 assertThat(ranking).isEqualTo(getLastMessageFor(user));
87 static Ranking getLastMessageFor(User user)
89 return getLastMessageFor(user, expectedMessages());
92 static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
94 return messagesForUsers
97 .reduce(null, (left, right) -> right);
100 static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
105 Entry.of("Hallo", 1l))),
109 Entry.of("Müsch", 1l))),
113 Entry.of("Hallo", 1l),
114 Entry.of("Welt", 1l))),
118 Entry.of("Müsch", 2l))),
122 Entry.of("Müsch", 2l),
127 Entry.of("Hallo", 1l),
128 Entry.of("Welt", 1l),
129 Entry.of("Boäh", 1l))),
133 Entry.of("Welt", 2l),
134 Entry.of("Hallo", 1l),
135 Entry.of("Boäh", 1l))),
139 Entry.of("Welt", 2l),
140 Entry.of("Boäh", 2l),
141 Entry.of("Hallo", 1l))),
145 Entry.of("Müsch", 2l),
150 Entry.of("Boäh", 3l),
151 Entry.of("Welt", 2l),
152 Entry.of("Hallo", 1l))),
157 Entry.of("Müsch", 2l))),
160 static MultiValueMap<User, Ranking> expectedMessages()
162 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
164 .of(EXPECTED_MESSAGES)
165 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
166 return expectedMessages;
169 static Map<String, Object> convertToMap(Properties properties)
176 entry -> (String)entry.getKey(),
177 entry -> entry.getValue()
181 static String parseHeader(Headers headers, String key)
183 Header header = headers.lastHeader(key);
186 return key + "=null";
190 return key + "=" + new String(header.value());