top10: 1.1.2 - (RED) Added IT for the expectated processing
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static void writeInputData(BiConsumer<Key, Counter> consumer)
19         {
20                 consumer.accept(
21                                 Key.of("peter","Hallo"),
22                                 Counter.of("peter","Hallo",1));
23                 consumer.accept(
24                                 Key.of("klaus","Müsch"),
25                                 Counter.of("klaus","Müsch",1));
26                 consumer.accept(
27                                 Key.of("peter","Welt"),
28                                 Counter.of("peter","Welt",1));
29                 consumer.accept(
30                                 Key.of("klaus","Müsch"),
31                                 Counter.of("klaus","Müsch",2));
32                 consumer.accept(
33                                 Key.of("klaus","s"),
34                                 Counter.of("klaus","s",1));
35                 consumer.accept(
36                                 Key.of("peter","Boäh"),
37                                 Counter.of("peter","Boäh",1));
38                 consumer.accept(
39                                 Key.of("peter","Welt"),
40                                 Counter.of("peter","Welt",2));
41                 consumer.accept(
42                                 Key.of("peter","Boäh"),
43                                 Counter.of("peter","Boäh",2));
44                 consumer.accept(
45                                 Key.of("klaus","s"),
46                                 Counter.of("klaus","s",2));
47                 consumer.accept(
48                                 Key.of("peter","Boäh"),
49                                 Counter.of("peter","Boäh",3));
50                 consumer.accept(
51                                 Key.of("klaus","s"),
52                                 Counter.of("klaus","s",3));
53         }
54
55         static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
56         {
57                 assertThat(receivedMessages).hasSize(11);
58                 assertThat(receivedMessages).containsSubsequence(
59                                 expectedMessages[0],
60                                 expectedMessages[2],
61                                 expectedMessages[5],
62                                 expectedMessages[6],
63                                 expectedMessages[7]); // peter
64
65                 assertThat(receivedMessages).containsSubsequence(
66                                 expectedMessages[1],
67                                 expectedMessages[3],
68                                 expectedMessages[4],
69                                 expectedMessages[8],
70                                 expectedMessages[10]); // klaus
71         }
72
73         static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
74         {
75                         KeyValue.pair( // 0
76                                         "peter",
77                                         Ranking.of(
78                                                         Entry.of("Hallo", 1l))),
79                         KeyValue.pair( // 1
80                                         "klaus",
81                                         Ranking.of(
82                                                         Entry.of("Müsch", 1l))),
83                         KeyValue.pair( // 2
84                                         "peter",
85                                         Ranking.of(
86                                                         Entry.of("Hallo", 1l),
87                                                         Entry.of("Welt", 1l))),
88                         KeyValue.pair( // 3
89                                         "klaus",
90                                         Ranking.of(
91                                                         Entry.of("Müsch", 2l))),
92                         KeyValue.pair( // 4
93                                         "klaus",
94                                         Ranking.of(
95                                                         Entry.of("Müsch", 2l),
96                                                         Entry.of("s", 1l))),
97                         KeyValue.pair( // 5
98                                         "peter",
99                                         Ranking.of(
100                                                         Entry.of("Hallo", 1l),
101                                                         Entry.of("Welt", 1l),
102                                                         Entry.of("Boäh", 1l))),
103                         KeyValue.pair( // 6
104                                         "peter",
105                                         Ranking.of(
106                                                         Entry.of("Welt", 2l),
107                                                         Entry.of("Hallo", 1l),
108                                                         Entry.of("Boäh", 1l))),
109                         KeyValue.pair( // 7
110                                         "peter",
111                                         Ranking.of(
112                                                         Entry.of("Welt", 2l),
113                                                         Entry.of("Boäh", 2l),
114                                                         Entry.of("Hallo", 1l))),
115                         KeyValue.pair( // 8
116                                         "klaus",
117                                         Ranking.of(
118                                                         Entry.of("Müsch", 2l),
119                                                         Entry.of("s", 2l))),
120                         KeyValue.pair( // 9
121                                         "peter",
122                                         Ranking.of(
123                                                         Entry.of("Boäh", 3l),
124                                                         Entry.of("Welt", 2l),
125                                                         Entry.of("Hallo", 1l))),
126                         KeyValue.pair( // 10
127                                         "klaus",
128                                         Ranking.of(
129                                                         Entry.of("s", 3l),
130                                                         Entry.of("Müsch", 2l))),
131         };
132
133         static Map<String, Object> convertToMap(Properties properties)
134         {
135                 return properties
136                                 .entrySet()
137                                 .stream()
138                                 .collect(
139                                                 Collectors.toMap(
140                                                                 entry -> (String)entry.getKey(),
141                                                                 entry -> entry.getValue()
142                                                 ));
143         }
144
145         static String parseHeader(Headers headers, String key)
146         {
147                 Header header = headers.lastHeader(key);
148                 if (header == null)
149                 {
150                         return key + "=null";
151                 }
152                 else
153                 {
154                         return key + "=" + new String(header.value());
155                 }
156         }
157 }