1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
17 import static org.assertj.core.api.Assertions.assertThat;
22 static final User PETER = User.of("peter");
23 static final User KLAUS = User.of("klaus");
25 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
28 TestWord.of(PETER.getUser(),"Hallo"),
29 TestCounter.of(PETER.getUser(),"Hallo",1)),
31 TestWord.of(KLAUS.getUser(),"Müsch"),
32 TestCounter.of(KLAUS.getUser(),"Müsch",1)),
34 TestWord.of(PETER.getUser(),"Welt"),
35 TestCounter.of(PETER.getUser(),"Welt",1)),
37 TestWord.of(KLAUS.getUser(),"Müsch"),
38 TestCounter.of(KLAUS.getUser(),"Müsch",2)),
40 TestWord.of(KLAUS.getUser(),"s"),
41 TestCounter.of(KLAUS.getUser(),"s",1)),
43 TestWord.of(PETER.getUser(),"Boäh"),
44 TestCounter.of(PETER.getUser(),"Boäh",1)),
46 TestWord.of(PETER.getUser(),"Welt"),
47 TestCounter.of(PETER.getUser(),"Welt",2)),
49 TestWord.of(PETER.getUser(),"Boäh"),
50 TestCounter.of(PETER.getUser(),"Boäh",2)),
52 TestWord.of(KLAUS.getUser(),"s"),
53 TestCounter.of(KLAUS.getUser(),"s",2)),
55 TestWord.of(PETER.getUser(),"Boäh"),
56 TestCounter.of(PETER.getUser(),"Boäh",3)),
58 TestWord.of(KLAUS.getUser(),"s"),
59 TestCounter.of(KLAUS.getUser(),"s",3)),
62 static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
64 expectedMessages().forEach(
66 assertThat(receivedMessages.get(user))
67 .containsExactlyElementsOf(rankings));
70 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
72 assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
73 assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
76 static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
81 Entry.of("Hallo", 1l))),
85 Entry.of("Müsch", 1l))),
89 Entry.of("Hallo", 1l),
90 Entry.of("Welt", 1l))),
94 Entry.of("Müsch", 2l))),
98 Entry.of("Müsch", 2l),
103 Entry.of("Hallo", 1l),
104 Entry.of("Welt", 1l),
105 Entry.of("Boäh", 1l))),
109 Entry.of("Welt", 2l),
110 Entry.of("Hallo", 1l),
111 Entry.of("Boäh", 1l))),
115 Entry.of("Welt", 2l),
116 Entry.of("Boäh", 2l),
117 Entry.of("Hallo", 1l))),
121 Entry.of("Müsch", 2l),
126 Entry.of("Boäh", 3l),
127 Entry.of("Welt", 2l),
128 Entry.of("Hallo", 1l))),
133 Entry.of("Müsch", 2l))),
136 static MultiValueMap<User, Ranking> expectedMessages()
138 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
140 .of(EXPECTED_MESSAGES)
141 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
142 return expectedMessages;
145 static Map<String, Object> convertToMap(Properties properties)
152 entry -> (String)entry.getKey(),
153 entry -> entry.getValue()
157 static String parseHeader(Headers headers, String key)
159 Header header = headers.lastHeader(key);
162 return key + "=null";
166 return key + "=" + new String(header.value());