counter: 1.3.1 - Splitted up test in `CounterStreamProcessorTopologyTest`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / TestData.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static void writeInputData(BiConsumer<String, Word> consumer)
19         {
20                 consumer.accept(
21                                 "peter",
22                                 Word.of("peter","Hallo"));
23                 consumer.accept(
24                                 "klaus",
25                                 Word.of("klaus","Müsch"));
26                 consumer.accept(
27                                 "peter",
28                                 Word.of("peter","Welt"));
29                 consumer.accept(
30                                 "klaus",
31                                 Word.of("klaus","Müsch"));
32                 consumer.accept(
33                                 "klaus",
34                                 Word.of("klaus","s"));
35                 consumer.accept(
36                                 "peter",
37                                 Word.of("peter","Boäh"));
38                 consumer.accept(
39                                 "peter",
40                                 Word.of("peter","Welt"));
41                 consumer.accept(
42                                 "peter",
43                                 Word.of("peter","Boäh"));
44                 consumer.accept(
45                                 "klaus",
46                                 Word.of("klaus","s"));
47                 consumer.accept(
48                                 "peter",
49                                 Word.of("peter","Boäh"));
50                 consumer.accept(
51                                 "klaus",
52                                 Word.of("klaus","s"));
53         }
54
55         static void assertExpectedResult(List<KeyValue<Word, WordCounter>> receivedMessages)
56         {
57                 assertThat(receivedMessages).hasSize(11);
58                 assertThat(receivedMessages).containsSubsequence(
59                                 expectedMessages[0]); // Hallo
60                 assertThat(receivedMessages).containsSubsequence(
61                                 expectedMessages[1],
62                                 expectedMessages[3]); // Müsch
63                 assertThat(receivedMessages).containsSubsequence(
64                                 expectedMessages[2],
65                                 expectedMessages[6]);
66                 assertThat(receivedMessages).containsSubsequence(
67                                 expectedMessages[4],
68                                 expectedMessages[8],
69                                 expectedMessages[10]); // s
70                 assertThat(receivedMessages).containsSubsequence(
71                                 expectedMessages[5],
72                                 expectedMessages[7],
73                                 expectedMessages[9]); // Boäh
74         }
75
76         static KeyValue<Word, WordCounter>[] expectedMessages = new KeyValue[]
77         {
78                         KeyValue.pair(
79                                         Word.of("peter","Hallo"),
80                                         WordCounter.of("peter","Hallo",1)),
81                         KeyValue.pair(
82                                         Word.of("klaus","Müsch"),
83                                         WordCounter.of("klaus","Müsch",1)),
84                         KeyValue.pair(
85                                         Word.of("peter","Welt"),
86                                         WordCounter.of("peter","Welt",1)),
87                         KeyValue.pair(
88                                         Word.of("klaus","Müsch"),
89                                         WordCounter.of("klaus","Müsch",2)),
90                         KeyValue.pair(
91                                         Word.of("klaus","s"),
92                                         WordCounter.of("klaus","s",1)),
93                         KeyValue.pair(
94                                         Word.of("peter","Boäh"),
95                                         WordCounter.of("peter","Boäh",1)),
96                         KeyValue.pair(
97                                         Word.of("peter","Welt"),
98                                         WordCounter.of("peter","Welt",2)),
99                         KeyValue.pair(
100                                         Word.of("peter","Boäh"),
101                                         WordCounter.of("peter","Boäh",2)),
102                         KeyValue.pair(
103                                         Word.of("klaus","s"),
104                                         WordCounter.of("klaus","s",2)),
105                         KeyValue.pair(
106                                         Word.of("peter","Boäh"),
107                                         WordCounter.of("peter","Boäh",3)),
108                         KeyValue.pair(
109                                         Word.of("klaus","s"),
110                                         WordCounter.of("klaus","s",3)),
111         };
112
113         static Map<String, Object> convertToMap(Properties properties)
114         {
115                 return properties
116                                 .entrySet()
117                                 .stream()
118                                 .collect(
119                                                 Collectors.toMap(
120                                                                 entry -> (String)entry.getKey(),
121                                                                 entry -> entry.getValue()
122                                                 ));
123         }
124
125         static String parseHeader(Headers headers, String key)
126         {
127                 Header header = headers.lastHeader(key);
128                 if (header == null)
129                 {
130                         return key + "=null";
131                 }
132                 else
133                 {
134                         return key + "=" + new String(header.value());
135                 }
136         }
137 }