1 package de.juplo.kafka.wordcount.top10;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static void writeInputData(BiConsumer<Key, Counter> consumer)
21 Key.of("peter","Hallo"),
22 Counter.of("peter","Hallo",1));
24 Key.of("klaus","Müsch"),
25 Counter.of("klaus","Müsch",1));
27 Key.of("peter","Welt"),
28 Counter.of("peter","Welt",1));
30 Key.of("klaus","Müsch"),
31 Counter.of("klaus","Müsch",2));
34 Counter.of("klaus","s",1));
36 Key.of("peter","Boäh"),
37 Counter.of("peter","Boäh",1));
39 Key.of("peter","Welt"),
40 Counter.of("peter","Welt",2));
42 Key.of("peter","Boäh"),
43 Counter.of("peter","Boäh",2));
46 Counter.of("klaus","s",2));
48 Key.of("peter","Boäh"),
49 Counter.of("peter","Boäh",3));
52 Counter.of("klaus","s",3));
55 static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
57 assertThat(receivedMessages).hasSize(11);
58 assertThat(receivedMessages).containsSubsequence(
59 expectedMessages[0]); // Hallo
60 assertThat(receivedMessages).containsSubsequence(
62 expectedMessages[3]); // Müsch
63 assertThat(receivedMessages).containsSubsequence(
66 assertThat(receivedMessages).containsSubsequence(
69 expectedMessages[10]); // s
70 assertThat(receivedMessages).containsSubsequence(
73 expectedMessages[9]); // Boäh
76 static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
81 Entry.of("Hallo", 1l))),
85 Entry.of("Müsch", 1l))),
89 Entry.of("Hallo", 1l),
90 Entry.of("Welt", 1l))),
94 Entry.of("Müsch", 2l))),
98 Entry.of("Müsch", 2l),
103 Entry.of("Hallo", 1l),
104 Entry.of("Welt", 1l),
105 Entry.of("Boäh", 1l))),
109 Entry.of("Welt", 2l),
110 Entry.of("Hallo", 1l),
111 Entry.of("Boäh", 1l))),
115 Entry.of("Welt", 2l),
116 Entry.of("Boäh", 2l),
117 Entry.of("Hallo", 1l))),
121 Entry.of("Müsch", 2l),
126 Entry.of("Boäh", 3l),
127 Entry.of("Welt", 2l),
128 Entry.of("Hallo", 1l))),
133 Entry.of("Müsch", 2l))),
136 static Map<String, Object> convertToMap(Properties properties)
143 entry -> (String)entry.getKey(),
144 entry -> entry.getValue()
148 static String parseHeader(Headers headers, String key)
150 Header header = headers.lastHeader(key);
153 return key + "=null";
157 return key + "=" + new String(header.value());