counter: 1.2.15 - Refined `TestData.writeInputData(..)`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6 import org.springframework.util.LinkedMultiValueMap;
7 import org.springframework.util.MultiValueMap;
8
9 import java.util.Map;
10 import java.util.Properties;
11 import java.util.function.BiConsumer;
12 import java.util.stream.Collectors;
13 import java.util.stream.Stream;
14
15 import static org.assertj.core.api.Assertions.assertThat;
16
17
18 class TestData
19 {
20         static void writeInputData(BiConsumer<String, Word> consumer)
21         {
22                 Stream
23                                 .of(inputMessagesArray)
24                                 .forEach(word -> consumer.accept(word.getUser(), word));
25         }
26
27         static Word[] inputMessagesArray = new Word[]
28         {
29                         Word.of("peter","Hallo"),
30                         Word.of("klaus","Müsch"),
31                         Word.of("peter","Welt"),
32                         Word.of("klaus","Müsch"),
33                         Word.of("klaus","s"),
34                         Word.of("peter","Boäh"),
35                         Word.of("peter","Welt"),
36                         Word.of("peter","Boäh"),
37                         Word.of("klaus","s"),
38                         Word.of("peter","Boäh"),
39                         Word.of("klaus","s"),
40         };
41
42         static void assertExpectedResult(MultiValueMap<Word, WordCounter> receivedMessages)
43         {
44                 expectedMessages.forEach(
45                                 (word, counter) ->
46                                                 assertThat(receivedMessages.get(word))
47                                                                 .containsExactlyElementsOf(counter));
48         }
49
50         static KeyValue<Word, WordCounter>[] expectedMessagesArray = new KeyValue[]
51         {
52                         KeyValue.pair(
53                                         Word.of("peter","Hallo"),
54                                         WordCounter.of("peter","Hallo",1)),
55                         KeyValue.pair(
56                                         Word.of("klaus","Müsch"),
57                                         WordCounter.of("klaus","Müsch",1)),
58                         KeyValue.pair(
59                                         Word.of("peter","Welt"),
60                                         WordCounter.of("peter","Welt",1)),
61                         KeyValue.pair(
62                                         Word.of("klaus","Müsch"),
63                                         WordCounter.of("klaus","Müsch",2)),
64                         KeyValue.pair(
65                                         Word.of("klaus","s"),
66                                         WordCounter.of("klaus","s",1)),
67                         KeyValue.pair(
68                                         Word.of("peter","Boäh"),
69                                         WordCounter.of("peter","Boäh",1)),
70                         KeyValue.pair(
71                                         Word.of("peter","Welt"),
72                                         WordCounter.of("peter","Welt",2)),
73                         KeyValue.pair(
74                                         Word.of("peter","Boäh"),
75                                         WordCounter.of("peter","Boäh",2)),
76                         KeyValue.pair(
77                                         Word.of("klaus","s"),
78                                         WordCounter.of("klaus","s",2)),
79                         KeyValue.pair(
80                                         Word.of("peter","Boäh"),
81                                         WordCounter.of("peter","Boäh",3)),
82                         KeyValue.pair(
83                                         Word.of("klaus","s"),
84                                         WordCounter.of("klaus","s",3)),
85         };
86
87         static MultiValueMap<Word, WordCounter> expectedMessages;
88         static
89         {
90                 expectedMessages = new LinkedMultiValueMap<>();
91                 Stream
92                                 .of(expectedMessagesArray)
93                                 .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
94         }
95
96         static Map<String, Object> convertToMap(Properties properties)
97         {
98                 return properties
99                                 .entrySet()
100                                 .stream()
101                                 .collect(
102                                                 Collectors.toMap(
103                                                                 entry -> (String)entry.getKey(),
104                                                                 entry -> entry.getValue()
105                                                 ));
106         }
107
108         static String parseHeader(Headers headers, String key)
109         {
110                 Header header = headers.lastHeader(key);
111                 if (header == null)
112                 {
113                         return key + "=null";
114                 }
115                 else
116                 {
117                         return key + "=" + new String(header.value());
118                 }
119         }
120 }