1 package de.juplo.kafka.wordcount.top10;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static void writeInputData(BiConsumer<Key, Counter> consumer)
21 Key.of("peter","Hallo"),
22 Counter.of("peter","Hallo",1));
24 Key.of("klaus","Müsch"),
25 Counter.of("klaus","Müsch",1));
27 Key.of("peter","Welt"),
28 Counter.of("peter","Welt",1));
30 Key.of("klaus","Müsch"),
31 Counter.of("klaus","Müsch",2));
34 Counter.of("klaus","s",1));
36 Key.of("peter","Boäh"),
37 Counter.of("peter","Boäh",1));
39 Key.of("peter","Welt"),
40 Counter.of("peter","Welt",2));
42 Key.of("peter","Boäh"),
43 Counter.of("peter","Boäh",2));
46 Counter.of("klaus","s",2));
48 Key.of("peter","Boäh"),
49 Counter.of("peter","Boäh",3));
52 Counter.of("klaus","s",3));
55 static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
57 assertThat(receivedMessages).hasSize(11);
58 assertThat(receivedMessages).containsSubsequence(
59 expectedMessages[0]); // Hallo
60 assertThat(receivedMessages).containsSubsequence(
62 expectedMessages[3]); // Müsch
63 assertThat(receivedMessages).containsSubsequence(
66 assertThat(receivedMessages).containsSubsequence(
69 expectedMessages[10]); // s
70 assertThat(receivedMessages).containsSubsequence(
73 expectedMessages[9]); // Boäh
76 static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
80 Ranking.of("peter","Hallo",1)),
83 static Map<String, Object> convertToMap(Properties properties)
90 entry -> (String)entry.getKey(),
91 entry -> entry.getValue()
95 static String parseHeader(Headers headers, String key)
97 Header header = headers.lastHeader(key);
100 return key + "=null";
104 return key + "=" + new String(header.value());