1 package de.juplo.kafka.wordcount.counter;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static void writeInputData(BiConsumer<String, Word> consumer)
22 Word.of("peter","Hallo"));
25 Word.of("klaus","Müsch"));
28 Word.of("peter","Welt"));
31 Word.of("klaus","Müsch"));
34 Word.of("klaus","s"));
37 Word.of("peter","Boäh"));
40 Word.of("peter","Welt"));
43 Word.of("peter","Boäh"));
46 Word.of("klaus","s"));
49 Word.of("peter","Boäh"));
52 Word.of("klaus","s"));
55 static void assertExpectedResult(List<KeyValue<Word, WordCounter>> receivedMessages)
57 assertThat(receivedMessages).hasSize(11);
58 assertThat(receivedMessages).containsSubsequence(
59 expectedMessages[0]); // Hallo
60 assertThat(receivedMessages).containsSubsequence(
62 expectedMessages[3]); // Müsch
63 assertThat(receivedMessages).containsSubsequence(
66 assertThat(receivedMessages).containsSubsequence(
69 expectedMessages[10]); // s
70 assertThat(receivedMessages).containsSubsequence(
73 expectedMessages[9]); // Boäh
76 static KeyValue<Word, WordCounter>[] expectedMessages = new KeyValue[]
79 Word.of("peter","Hallo"),
80 WordCounter.of("peter","Hallo",1)),
82 Word.of("klaus","Müsch"),
83 WordCounter.of("klaus","Müsch",1)),
85 Word.of("peter","Welt"),
86 WordCounter.of("peter","Welt",1)),
88 Word.of("klaus","Müsch"),
89 WordCounter.of("klaus","Müsch",2)),
92 WordCounter.of("klaus","s",1)),
94 Word.of("peter","Boäh"),
95 WordCounter.of("peter","Boäh",1)),
97 Word.of("peter","Welt"),
98 WordCounter.of("peter","Welt",2)),
100 Word.of("peter","Boäh"),
101 WordCounter.of("peter","Boäh",2)),
103 Word.of("klaus","s"),
104 WordCounter.of("klaus","s",2)),
106 Word.of("peter","Boäh"),
107 WordCounter.of("peter","Boäh",3)),
109 Word.of("klaus","s"),
110 WordCounter.of("klaus","s",3)),
113 static Map<String, Object> convertToMap(Properties properties)
120 entry -> (String)entry.getKey(),
121 entry -> entry.getValue()
125 static String parseHeader(Headers headers, String key)
127 Header header = headers.lastHeader(key);
130 return key + "=null";
134 return key + "=" + new String(header.value());