1 package de.juplo.kafka.wordcount.counter;
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6 import org.springframework.util.LinkedMultiValueMap;
7 import org.springframework.util.MultiValueMap;
10 import java.util.Properties;
11 import java.util.function.BiConsumer;
12 import java.util.stream.Collectors;
13 import java.util.stream.Stream;
15 import static org.assertj.core.api.Assertions.assertThat;
20 static void injectInputMessages(BiConsumer<String, Word> consumer)
24 .forEach(word -> consumer.accept(word.getUser(), word));
27 static final Word[] INPUT_MESSAGES = new Word[]
29 Word.of("peter","Hallo"),
30 Word.of("klaus","Müsch"),
31 Word.of("peter","Welt"),
32 Word.of("klaus","Müsch"),
34 Word.of("peter","Boäh"),
35 Word.of("peter","Welt"),
36 Word.of("peter","Boäh"),
38 Word.of("peter","Boäh"),
42 static void assertExpectedMessages(MultiValueMap<Word, WordCounter> receivedMessages)
44 expectedMessages.forEach(
46 assertThat(receivedMessages.get(word))
47 .containsExactlyElementsOf(counter));
50 static final KeyValue<Word, WordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
53 Word.of("peter","Hallo"),
54 WordCounter.of("peter","Hallo",1)),
56 Word.of("klaus","Müsch"),
57 WordCounter.of("klaus","Müsch",1)),
59 Word.of("peter","Welt"),
60 WordCounter.of("peter","Welt",1)),
62 Word.of("klaus","Müsch"),
63 WordCounter.of("klaus","Müsch",2)),
66 WordCounter.of("klaus","s",1)),
68 Word.of("peter","Boäh"),
69 WordCounter.of("peter","Boäh",1)),
71 Word.of("peter","Welt"),
72 WordCounter.of("peter","Welt",2)),
74 Word.of("peter","Boäh"),
75 WordCounter.of("peter","Boäh",2)),
78 WordCounter.of("klaus","s",2)),
80 Word.of("peter","Boäh"),
81 WordCounter.of("peter","Boäh",3)),
84 WordCounter.of("klaus","s",3)),
87 static MultiValueMap<Word, WordCounter> expectedMessages;
90 expectedMessages = new LinkedMultiValueMap<>();
92 .of(EXPECTED_MESSAGES)
93 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
96 static Map<String, Object> convertToMap(Properties properties)
103 entry -> (String)entry.getKey(),
104 entry -> entry.getValue()
108 static String parseHeader(Headers headers, String key)
110 Header header = headers.lastHeader(key);
113 return key + "=null";
117 return key + "=" + new String(header.value());