1 package de.juplo.kafka.wordcount.counter;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6 import org.springframework.util.LinkedMultiValueMap;
7 import org.springframework.util.MultiValueMap;
10 import java.util.Properties;
11 import java.util.stream.Collectors;
12 import java.util.stream.Stream;
14 import static org.assertj.core.api.Assertions.assertThat;
19 static final Word[] INPUT_MESSAGES = new Word[]
21 Word.of("peter","Hallo"),
22 Word.of("klaus","Müsch"),
23 Word.of("peter","Welt"),
24 Word.of("klaus","Müsch"),
26 Word.of("peter","Boäh"),
27 Word.of("peter","Welt"),
28 Word.of("peter","Boäh"),
30 Word.of("peter","Boäh"),
34 static void assertExpectedMessages(MultiValueMap<Word, WordCounter> receivedMessages)
36 expectedMessages().forEach(
38 assertThat(receivedMessages.get(word))
39 .containsExactlyElementsOf(counter));
42 static final KeyValue<Word, WordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
45 Word.of("peter","Hallo"),
46 WordCounter.of("peter","Hallo",1)),
48 Word.of("klaus","Müsch"),
49 WordCounter.of("klaus","Müsch",1)),
51 Word.of("peter","Welt"),
52 WordCounter.of("peter","Welt",1)),
54 Word.of("klaus","Müsch"),
55 WordCounter.of("klaus","Müsch",2)),
58 WordCounter.of("klaus","s",1)),
60 Word.of("peter","Boäh"),
61 WordCounter.of("peter","Boäh",1)),
63 Word.of("peter","Welt"),
64 WordCounter.of("peter","Welt",2)),
66 Word.of("peter","Boäh"),
67 WordCounter.of("peter","Boäh",2)),
70 WordCounter.of("klaus","s",2)),
72 Word.of("peter","Boäh"),
73 WordCounter.of("peter","Boäh",3)),
76 WordCounter.of("klaus","s",3)),
79 static MultiValueMap<Word, WordCounter> expectedMessages()
81 MultiValueMap<Word, WordCounter> expectedMessages = new LinkedMultiValueMap<>();
83 .of(EXPECTED_MESSAGES)
84 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
85 return expectedMessages;
88 static Map<String, Object> convertToMap(Properties properties)
95 entry -> (String)entry.getKey(),
96 entry -> entry.getValue()
100 static String parseHeader(Headers headers, String key)
102 Header header = headers.lastHeader(key);
105 return key + "=null";
109 return key + "=" + new String(header.value());