1 package de.juplo.kafka.wordcount.top10;
3 import de.juplo.kafka.wordcount.counter.TestCounter;
4 import de.juplo.kafka.wordcount.counter.TestWord;
5 import org.apache.kafka.common.header.Header;
6 import org.apache.kafka.common.header.Headers;
7 import org.apache.kafka.streams.KeyValue;
8 import org.springframework.util.LinkedMultiValueMap;
9 import org.springframework.util.MultiValueMap;
12 import java.util.Properties;
13 import java.util.stream.Collectors;
14 import java.util.stream.Stream;
16 import static org.assertj.core.api.Assertions.assertThat;
21 static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
24 TestWord.of("peter","Hallo"),
25 TestCounter.of("peter","Hallo",1)),
27 TestWord.of("klaus","Müsch"),
28 TestCounter.of("klaus","Müsch",1)),
30 TestWord.of("peter","Welt"),
31 TestCounter.of("peter","Welt",1)),
33 TestWord.of("klaus","Müsch"),
34 TestCounter.of("klaus","Müsch",2)),
36 TestWord.of("klaus","s"),
37 TestCounter.of("klaus","s",1)),
39 TestWord.of("peter","Boäh"),
40 TestCounter.of("peter","Boäh",1)),
42 TestWord.of("peter","Welt"),
43 TestCounter.of("peter","Welt",2)),
45 TestWord.of("peter","Boäh"),
46 TestCounter.of("peter","Boäh",2)),
48 TestWord.of("klaus","s"),
49 TestCounter.of("klaus","s",2)),
51 TestWord.of("peter","Boäh"),
52 TestCounter.of("peter","Boäh",3)),
54 TestWord.of("klaus","s"),
55 TestCounter.of("klaus","s",3)),
58 static void assertExpectedMessages(MultiValueMap<String, Ranking> receivedMessages)
60 expectedMessages().forEach(
62 assertThat(receivedMessages.get(user))
63 .containsExactlyElementsOf(rankings));
66 static KeyValue<String, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
71 Entry.of("Hallo", 1l))),
75 Entry.of("Müsch", 1l))),
79 Entry.of("Hallo", 1l),
80 Entry.of("Welt", 1l))),
84 Entry.of("Müsch", 2l))),
88 Entry.of("Müsch", 2l),
93 Entry.of("Hallo", 1l),
95 Entry.of("Boäh", 1l))),
100 Entry.of("Hallo", 1l),
101 Entry.of("Boäh", 1l))),
105 Entry.of("Welt", 2l),
106 Entry.of("Boäh", 2l),
107 Entry.of("Hallo", 1l))),
111 Entry.of("Müsch", 2l),
116 Entry.of("Boäh", 3l),
117 Entry.of("Welt", 2l),
118 Entry.of("Hallo", 1l))),
123 Entry.of("Müsch", 2l))),
126 static MultiValueMap<String, Ranking> expectedMessages()
128 MultiValueMap<String, Ranking> expectedMessages = new LinkedMultiValueMap<>();
130 .of(EXPECTED_MESSAGES)
131 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
132 return expectedMessages;
135 static Map<String, Object> convertToMap(Properties properties)
142 entry -> (String)entry.getKey(),
143 entry -> entry.getValue()
147 static String parseHeader(Headers headers, String key)
149 Header header = headers.lastHeader(key);
152 return key + "=null";
156 return key + "=" + new String(header.value());