1 package de.juplo.kafka.wordcount.counter;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6 import org.springframework.util.LinkedMultiValueMap;
7 import org.springframework.util.MultiValueMap;
10 import java.util.Properties;
11 import java.util.function.BiConsumer;
12 import java.util.stream.Collectors;
13 import java.util.stream.Stream;
15 import static org.assertj.core.api.Assertions.assertThat;
20 static void writeInputData(BiConsumer<String, Word> consumer)
24 Word.of("peter","Hallo"));
27 Word.of("klaus","Müsch"));
30 Word.of("peter","Welt"));
33 Word.of("klaus","Müsch"));
36 Word.of("klaus","s"));
39 Word.of("peter","Boäh"));
42 Word.of("peter","Welt"));
45 Word.of("peter","Boäh"));
48 Word.of("klaus","s"));
51 Word.of("peter","Boäh"));
54 Word.of("klaus","s"));
57 static void assertExpectedResult(MultiValueMap<Word, WordCounter> receivedMessages)
59 expectedMessages.forEach(
61 assertThat(receivedMessages.get(word))
62 .containsExactlyElementsOf(counter));
65 static KeyValue<Word, WordCounter>[] expectedMessagesArray = new KeyValue[]
68 Word.of("peter","Hallo"),
69 WordCounter.of("peter","Hallo",1)),
71 Word.of("klaus","Müsch"),
72 WordCounter.of("klaus","Müsch",1)),
74 Word.of("peter","Welt"),
75 WordCounter.of("peter","Welt",1)),
77 Word.of("klaus","Müsch"),
78 WordCounter.of("klaus","Müsch",2)),
81 WordCounter.of("klaus","s",1)),
83 Word.of("peter","Boäh"),
84 WordCounter.of("peter","Boäh",1)),
86 Word.of("peter","Welt"),
87 WordCounter.of("peter","Welt",2)),
89 Word.of("peter","Boäh"),
90 WordCounter.of("peter","Boäh",2)),
93 WordCounter.of("klaus","s",2)),
95 Word.of("peter","Boäh"),
96 WordCounter.of("peter","Boäh",3)),
99 WordCounter.of("klaus","s",3)),
102 static MultiValueMap<Word, WordCounter> expectedMessages;
105 expectedMessages = new LinkedMultiValueMap<>();
107 .of(expectedMessagesArray)
108 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
111 static Map<String, Object> convertToMap(Properties properties)
118 entry -> (String)entry.getKey(),
119 entry -> entry.getValue()
123 static String parseHeader(Headers headers, String key)
125 Header header = headers.lastHeader(key);
128 return key + "=null";
132 return key + "=" + new String(header.value());