1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
17 import static org.assertj.core.api.Assertions.assertThat;
22 static final User PETER = User.of("peter");
23 static final User KLAUS = User.of("klaus");
25 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
28 TestWord.of(PETER.getUser(),"Hallo"),
29 TestCounter.of(PETER.getUser(),"Hallo",1)),
31 TestWord.of(KLAUS.getUser(),"Müsch"),
32 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
34 TestWord.of(PETER.getUser(),"Welt"),
35 TestCounter.of(PETER.getUser(),"Welt",1)),
37 TestWord.of(KLAUS.getUser(),"Müsch"),
38 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
40 TestWord.of(KLAUS.getUser(),"s"),
41 TestCounter.of(KLAUS.getUser(),"s",1)),
43 TestWord.of(PETER.getUser(),"Boäh"),
44 TestCounter.of(PETER.getUser(),"Boäh",1)),
46 TestWord.of(PETER.getUser(),"Welt"),
47 TestCounter.of(PETER.getUser(),"Welt",2)),
49 TestWord.of(PETER.getUser(),"Boäh"),
50 TestCounter.of(PETER.getUser(),"Boäh",2)),
52 TestWord.of(KLAUS.getUser(),"s"),
53 TestCounter.of(KLAUS.getUser(),"s",2)),
55 TestWord.of(PETER.getUser(),"Boäh"),
56 TestCounter.of(PETER.getUser(),"Boäh",3)),
58 TestWord.of(KLAUS.getUser(),"s"),
59 TestCounter.of(KLAUS.getUser(),"s",3)),
62 static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
64 expectedMessages().forEach(
66 assertThat(receivedMessages.get(user))
67 .containsExactlyElementsOf(rankings));
70 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
72 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
73 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
76 static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
78 assertThat(ranking).isEqualTo(getLastMessageFor(user));
81 static Ranking getLastMessageFor(User user)
83 return getLastMessageFor(user, expectedMessages());
86 static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
88 return messagesForUsers
91 .reduce(null, (left, right) -> right);
94 static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
99 Entry.of("Hallo", 1l))),
103 Entry.of("Müsch", 1l))),
107 Entry.of("Hallo", 1l),
108 Entry.of("Welt", 1l))),
112 Entry.of("Müsch", 2l))),
116 Entry.of("Müsch", 2l),
121 Entry.of("Hallo", 1l),
122 Entry.of("Welt", 1l),
123 Entry.of("Boäh", 1l))),
127 Entry.of("Welt", 2l),
128 Entry.of("Hallo", 1l),
129 Entry.of("Boäh", 1l))),
133 Entry.of("Welt", 2l),
134 Entry.of("Boäh", 2l),
135 Entry.of("Hallo", 1l))),
139 Entry.of("Müsch", 2l),
144 Entry.of("Boäh", 3l),
145 Entry.of("Welt", 2l),
146 Entry.of("Hallo", 1l))),
151 Entry.of("Müsch", 2l))),
154 static MultiValueMap<User, Ranking> expectedMessages()
156 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
158 .of(EXPECTED_MESSAGES)
159 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
160 return expectedMessages;
163 static Map<String, Object> convertToMap(Properties properties)
170 entry -> (String)entry.getKey(),
171 entry -> entry.getValue()
175 static String parseHeader(Headers headers, String key)
177 Header header = headers.lastHeader(key);
180 return key + "=null";
184 return key + "=" + new String(header.value());