1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
13 import java.util.Properties;
14 import java.util.stream.Collectors;
15 import java.util.stream.Stream;
17 import static org.assertj.core.api.Assertions.assertThat;
22 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
25 TestWord.of("peter","Hallo"),
26 TestCounter.of("peter","Hallo",1)),
28 TestWord.of("klaus","Müsch"),
29 TestCounter.of("klaus","Müsch",1)),
31 TestWord.of("peter","Welt"),
32 TestCounter.of("peter","Welt",1)),
34 TestWord.of("klaus","Müsch"),
35 TestCounter.of("klaus","Müsch",2)),
37 TestWord.of("klaus","s"),
38 TestCounter.of("klaus","s",1)),
40 TestWord.of("peter","Boäh"),
41 TestCounter.of("peter","Boäh",1)),
43 TestWord.of("peter","Welt"),
44 TestCounter.of("peter","Welt",2)),
46 TestWord.of("peter","Boäh"),
47 TestCounter.of("peter","Boäh",2)),
49 TestWord.of("klaus","s"),
50 TestCounter.of("klaus","s",2)),
52 TestWord.of("peter","Boäh"),
53 TestCounter.of("peter","Boäh",3)),
55 TestWord.of("klaus","s"),
56 TestCounter.of("klaus","s",3)),
59 static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
61 expectedMessages().forEach(
63 assertThat(receivedMessages.get(user))
64 .containsExactlyElementsOf(rankings));
67 static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
69 assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
70 assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
73 static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
78 Entry.of("Hallo", 1l))),
82 Entry.of("Müsch", 1l))),
86 Entry.of("Hallo", 1l),
87 Entry.of("Welt", 1l))),
91 Entry.of("Müsch", 2l))),
95 Entry.of("Müsch", 2l),
100 Entry.of("Hallo", 1l),
101 Entry.of("Welt", 1l),
102 Entry.of("Boäh", 1l))),
106 Entry.of("Welt", 2l),
107 Entry.of("Hallo", 1l),
108 Entry.of("Boäh", 1l))),
112 Entry.of("Welt", 2l),
113 Entry.of("Boäh", 2l),
114 Entry.of("Hallo", 1l))),
118 Entry.of("Müsch", 2l),
123 Entry.of("Boäh", 3l),
124 Entry.of("Welt", 2l),
125 Entry.of("Hallo", 1l))),
130 Entry.of("Müsch", 2l))),
133 static MultiValueMap<User, Ranking> expectedMessages()
135 MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
137 .of(EXPECTED_MESSAGES)
138 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
139 return expectedMessages;
142 static Map<String, Object> convertToMap(Properties properties)
149 entry -> (String)entry.getKey(),
150 entry -> entry.getValue()
154 static String parseHeader(Headers headers, String key)
156 Header header = headers.lastHeader(key);
159 return key + "=null";
163 return key + "=" + new String(header.value());