1 package de.juplo.kafka.wordcount.counter;
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import org.apache.kafka.common.header.Header;
7 import org.apache.kafka.common.header.Headers;
8 import org.apache.kafka.streams.KeyValue;
9 import org.springframework.util.LinkedMultiValueMap;
10 import org.springframework.util.MultiValueMap;
12 import java.util.stream.Stream;
14 import static org.assertj.core.api.Assertions.assertThat;
19 private static final TestInputWord[] INPUT_MESSAGES = new TestInputWord[]
21 TestInputWord.of("peter","Hallo"),
22 TestInputWord.of("klaus","Müsch"),
23 TestInputWord.of("peter","Welt"),
24 TestInputWord.of("klaus","Müsch"),
25 TestInputWord.of("klaus","s"),
26 TestInputWord.of("peter","Boäh"),
27 TestInputWord.of("peter","Welt"),
28 TestInputWord.of("peter","Boäh"),
29 TestInputWord.of("klaus","s"),
30 TestInputWord.of("peter","Boäh"),
31 TestInputWord.of("klaus","s"),
34 static Stream<TestInputWord> getInputMessages()
36 return Stream.of(TestData.INPUT_MESSAGES);
39 static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
41 expectedMessages().forEach(
43 assertThat(receivedMessages.get(word))
44 .containsExactlyElementsOf(counter));
47 private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
50 TestOutputWord.of("peter","Hallo"),
51 TestOutputWordCounter.of("peter","Hallo",1)),
53 TestOutputWord.of("klaus","Müsch"),
54 TestOutputWordCounter.of("klaus","Müsch",1)),
56 TestOutputWord.of("peter","Welt"),
57 TestOutputWordCounter.of("peter","Welt",1)),
59 TestOutputWord.of("klaus","Müsch"),
60 TestOutputWordCounter.of("klaus","Müsch",2)),
62 TestOutputWord.of("klaus","s"),
63 TestOutputWordCounter.of("klaus","s",1)),
65 TestOutputWord.of("peter","Boäh"),
66 TestOutputWordCounter.of("peter","Boäh",1)),
68 TestOutputWord.of("peter","Welt"),
69 TestOutputWordCounter.of("peter","Welt",2)),
71 TestOutputWord.of("peter","Boäh"),
72 TestOutputWordCounter.of("peter","Boäh",2)),
74 TestOutputWord.of("klaus","s"),
75 TestOutputWordCounter.of("klaus","s",2)),
77 TestOutputWord.of("peter","Boäh"),
78 TestOutputWordCounter.of("peter","Boäh",3)),
80 TestOutputWord.of("klaus","s"),
81 TestOutputWordCounter.of("klaus","s",3)),
84 static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
86 MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
88 .of(EXPECTED_MESSAGES)
89 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
90 return expectedMessages;
93 static String parseHeader(Headers headers, String key)
95 Header header = headers.lastHeader(key);
102 return key + "=" + new String(header.value());