1 package de.juplo.kafka.wordcount.top10;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static void writeInputData(BiConsumer<Key, Counter> consumer)
21 Key.of("peter","Hallo"),
22 Counter.of("peter","Hallo",1));
24 Key.of("klaus","Müsch"),
25 Counter.of("klaus","Müsch",1));
27 Key.of("peter","Welt"),
28 Counter.of("peter","Welt",1));
30 Key.of("klaus","Müsch"),
31 Counter.of("klaus","Müsch",2));
34 Counter.of("klaus","s",1));
36 Key.of("peter","Boäh"),
37 Counter.of("peter","Boäh",1));
39 Key.of("peter","Welt"),
40 Counter.of("peter","Welt",2));
42 Key.of("peter","Boäh"),
43 Counter.of("peter","Boäh",2));
46 Counter.of("klaus","s",2));
48 Key.of("peter","Boäh"),
49 Counter.of("peter","Boäh",3));
52 Counter.of("klaus","s",3));
55 static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
57 assertThat(receivedMessages).hasSize(11);
58 assertThat(receivedMessages).containsSubsequence(
63 expectedMessages[7]); // peter
64 assertThat(receivedMessages).containsSubsequence(
69 expectedMessages[10]); // klaus
72 static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
77 Entry.of("Hallo", 1l))),
81 Entry.of("Müsch", 1l))),
86 Entry.of("Hallo", 1l))),
90 Entry.of("Müsch", 2l))),
94 Entry.of("Müsch", 2l),
100 Entry.of("Hallo", 1l),
101 Entry.of("Welt", 1l))),
105 Entry.of("Welt", 2l),
106 Entry.of("Boäh", 1l),
107 Entry.of("Hallo", 1l))),
111 Entry.of("Welt", 2l),
112 Entry.of("Boäh", 2l),
113 Entry.of("Hallo", 1l))),
118 Entry.of("Müsch", 2l))),
122 Entry.of("Boäh", 3l),
123 Entry.of("Welt", 2l),
124 Entry.of("Hallo", 1l))),
129 Entry.of("Müsch", 2l))),
132 static Map<String, Object> convertToMap(Properties properties)
139 entry -> (String)entry.getKey(),
140 entry -> entry.getValue()
144 static String parseHeader(Headers headers, String key)
146 Header header = headers.lastHeader(key);
149 return key + "=null";
153 return key + "=" + new String(header.value());