fb030e39479145e561e834b2c62d8320401d24c9
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static void writeInputData(BiConsumer<Key, Counter> consumer)
19         {
20                 consumer.accept(
21                                 Key.of("peter","Hallo"),
22                                 Counter.of("peter","Hallo",1));
23                 consumer.accept(
24                                 Key.of("klaus","Müsch"),
25                                 Counter.of("klaus","Müsch",1));
26                 consumer.accept(
27                                 Key.of("peter","Welt"),
28                                 Counter.of("peter","Welt",1));
29                 consumer.accept(
30                                 Key.of("klaus","Müsch"),
31                                 Counter.of("klaus","Müsch",2));
32                 consumer.accept(
33                                 Key.of("klaus","s"),
34                                 Counter.of("klaus","s",1));
35                 consumer.accept(
36                                 Key.of("peter","Boäh"),
37                                 Counter.of("peter","Boäh",1));
38                 consumer.accept(
39                                 Key.of("peter","Welt"),
40                                 Counter.of("peter","Welt",2));
41                 consumer.accept(
42                                 Key.of("peter","Boäh"),
43                                 Counter.of("peter","Boäh",2));
44                 consumer.accept(
45                                 Key.of("klaus","s"),
46                                 Counter.of("klaus","s",2));
47                 consumer.accept(
48                                 Key.of("peter","Boäh"),
49                                 Counter.of("peter","Boäh",3));
50                 consumer.accept(
51                                 Key.of("klaus","s"),
52                                 Counter.of("klaus","s",3));
53         }
54
55         static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
56         {
57                 assertThat(receivedMessages).hasSize(11);
58                 assertThat(receivedMessages).containsSubsequence(
59                                 expectedMessages[0]); // Hallo
60                 assertThat(receivedMessages).containsSubsequence(
61                                 expectedMessages[1],
62                                 expectedMessages[3]); // Müsch
63                 assertThat(receivedMessages).containsSubsequence(
64                                 expectedMessages[2],
65                                 expectedMessages[6]);
66                 assertThat(receivedMessages).containsSubsequence(
67                                 expectedMessages[4],
68                                 expectedMessages[8],
69                                 expectedMessages[10]); // s
70                 assertThat(receivedMessages).containsSubsequence(
71                                 expectedMessages[5],
72                                 expectedMessages[7],
73                                 expectedMessages[9]); // Boäh
74         }
75
76         static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
77         {
78                         KeyValue.pair(
79                                         "peter",
80                                         Ranking.of(
81                                                         Entry.of("Hallo", 1l))),
82                         KeyValue.pair(
83                                         "klaus",
84                                         Ranking.of(
85                                                         Entry.of("Müsch", 1l))),
86                         KeyValue.pair(
87                                         "peter",
88                                         Ranking.of(
89                                                         Entry.of("Hallo", 1l),
90                                                         Entry.of("Welt", 1l))),
91                         KeyValue.pair(
92                                         "klaus",
93                                         Ranking.of(
94                                                         Entry.of("Müsch", 2l))),
95                         KeyValue.pair(
96                                         "klaus",
97                                         Ranking.of(
98                                                         Entry.of("Müsch", 2l),
99                                                         Entry.of("s", 1l))),
100                         KeyValue.pair(
101                                         "peter",
102                                         Ranking.of(
103                                                         Entry.of("Hallo", 1l),
104                                                         Entry.of("Welt", 1l),
105                                                         Entry.of("Boäh", 1l))),
106                         KeyValue.pair(
107                                         "peter",
108                                         Ranking.of(
109                                                         Entry.of("Welt", 2l),
110                                                         Entry.of("Hallo", 1l),
111                                                         Entry.of("Boäh", 1l))),
112                         KeyValue.pair(
113                                         "peter",
114                                         Ranking.of(
115                                                         Entry.of("Welt", 2l),
116                                                         Entry.of("Boäh", 2l),
117                                                         Entry.of("Hallo", 1l))),
118                         KeyValue.pair(
119                                         "klaus",
120                                         Ranking.of(
121                                                         Entry.of("Müsch", 2l),
122                                                         Entry.of("s", 2l))),
123                         KeyValue.pair(
124                                         "peter",
125                                         Ranking.of(
126                                                         Entry.of("Boäh", 3l),
127                                                         Entry.of("Welt", 2l),
128                                                         Entry.of("Hallo", 1l))),
129                         KeyValue.pair(
130                                         "klaus",
131                                         Ranking.of(
132                                                         Entry.of("s", 3l),
133                                                         Entry.of("Müsch", 2l))),
134         };
135
136         static Map<String, Object> convertToMap(Properties properties)
137         {
138                 return properties
139                                 .entrySet()
140                                 .stream()
141                                 .collect(
142                                                 Collectors.toMap(
143                                                                 entry -> (String)entry.getKey(),
144                                                                 entry -> entry.getValue()
145                                                 ));
146         }
147
148         static String parseHeader(Headers headers, String key)
149         {
150                 Header header = headers.lastHeader(key);
151                 if (header == null)
152                 {
153                         return key + "=null";
154                 }
155                 else
156                 {
157                         return key + "=" + new String(header.value());
158                 }
159         }
160 }