1 package de.juplo.kafka.wordcount.top10;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static void writeInputData(BiConsumer<Key, Counter> consumer)
21 Key.of("peter","Hallo"),
22 Counter.of("peter","Hallo",1));
24 Key.of("klaus","Müsch"),
25 Counter.of("klaus","Müsch",1));
27 Key.of("peter","Welt"),
28 Counter.of("peter","Welt",1));
30 Key.of("klaus","Müsch"),
31 Counter.of("klaus","Müsch",2));
34 Counter.of("klaus","s",1));
36 Key.of("peter","Boäh"),
37 Counter.of("peter","Boäh",1));
39 Key.of("peter","Welt"),
40 Counter.of("peter","Welt",2));
42 Key.of("peter","Boäh"),
43 Counter.of("peter","Boäh",2));
46 Counter.of("klaus","s",2));
48 Key.of("peter","Boäh"),
49 Counter.of("peter","Boäh",3));
52 Counter.of("klaus","s",3));
55 static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
57 assertThat(receivedMessages).hasSize(11);
58 assertThat(receivedMessages).containsSubsequence(
63 expectedMessages[7]); // peter
65 assertThat(receivedMessages).containsSubsequence(
70 expectedMessages[10]); // klaus
73 static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
78 Entry.of("Hallo", 1l))),
82 Entry.of("Müsch", 1l))),
86 Entry.of("Hallo", 1l),
87 Entry.of("Welt", 1l))),
91 Entry.of("Müsch", 2l))),
95 Entry.of("Müsch", 2l),
100 Entry.of("Hallo", 1l),
101 Entry.of("Welt", 1l),
102 Entry.of("Boäh", 1l))),
106 Entry.of("Welt", 2l),
107 Entry.of("Hallo", 1l),
108 Entry.of("Boäh", 1l))),
112 Entry.of("Welt", 2l),
113 Entry.of("Boäh", 2l),
114 Entry.of("Hallo", 1l))),
118 Entry.of("Müsch", 2l),
123 Entry.of("Boäh", 3l),
124 Entry.of("Welt", 2l),
125 Entry.of("Hallo", 1l))),
130 Entry.of("Müsch", 2l))),
133 static Map<String, Object> convertToMap(Properties properties)
140 entry -> (String)entry.getKey(),
141 entry -> entry.getValue()
145 static String parseHeader(Headers headers, String key)
147 Header header = headers.lastHeader(key);
150 return key + "=null";
154 return key + "=" + new String(header.value());