WIP
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / TestData.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import org.apache.kafka.common.header.Header;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.streams.KeyValue;
6
7 import java.util.List;
8 import java.util.Map;
9 import java.util.Properties;
10 import java.util.function.BiConsumer;
11 import java.util.stream.Collectors;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 class TestData
17 {
18         static void writeInputData(BiConsumer<Key, Counter> consumer)
19         {
20                 consumer.accept(
21                                 Key.of("peter","Hallo"),
22                                 Counter.of("peter","Hallo",1));
23                 consumer.accept(
24                                 Key.of("klaus","Müsch"),
25                                 Counter.of("klaus","Müsch",1));
26                 consumer.accept(
27                                 Key.of("peter","Welt"),
28                                 Counter.of("peter","Welt",1));
29                 consumer.accept(
30                                 Key.of("klaus","Müsch"),
31                                 Counter.of("klaus","Müsch",2));
32                 consumer.accept(
33                                 Key.of("klaus","s"),
34                                 Counter.of("klaus","s",1));
35                 consumer.accept(
36                                 Key.of("peter","Boäh"),
37                                 Counter.of("peter","Boäh",1));
38                 consumer.accept(
39                                 Key.of("peter","Welt"),
40                                 Counter.of("peter","Welt",2));
41                 consumer.accept(
42                                 Key.of("peter","Boäh"),
43                                 Counter.of("peter","Boäh",2));
44                 consumer.accept(
45                                 Key.of("klaus","s"),
46                                 Counter.of("klaus","s",2));
47                 consumer.accept(
48                                 Key.of("peter","Boäh"),
49                                 Counter.of("peter","Boäh",3));
50                 consumer.accept(
51                                 Key.of("klaus","s"),
52                                 Counter.of("klaus","s",3));
53         }
54
55         static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
56         {
57                 assertThat(receivedMessages).hasSize(11);
58                 assertThat(receivedMessages).containsSubsequence(
59                                 expectedMessages[0]); // Hallo
60                 assertThat(receivedMessages).containsSubsequence(
61                                 expectedMessages[1],
62                                 expectedMessages[3]); // Müsch
63                 assertThat(receivedMessages).containsSubsequence(
64                                 expectedMessages[2],
65                                 expectedMessages[6]);
66                 assertThat(receivedMessages).containsSubsequence(
67                                 expectedMessages[4],
68                                 expectedMessages[8],
69                                 expectedMessages[10]); // s
70                 assertThat(receivedMessages).containsSubsequence(
71                                 expectedMessages[5],
72                                 expectedMessages[7],
73                                 expectedMessages[9]); // Boäh
74         }
75
76         static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
77         {
78                         KeyValue.pair(
79                                         "peter",
80                                         Ranking.of("peter","Hallo",1)),
81         };
82
83         static Map<String, Object> convertToMap(Properties properties)
84         {
85                 return properties
86                                 .entrySet()
87                                 .stream()
88                                 .collect(
89                                                 Collectors.toMap(
90                                                                 entry -> (String)entry.getKey(),
91                                                                 entry -> entry.getValue()
92                                                 ));
93         }
94
95         static String parseHeader(Headers headers, String key)
96         {
97                 Header header = headers.lastHeader(key);
98                 if (header == null)
99                 {
100                         return key + "=null";
101                 }
102                 else
103                 {
104                         return key + "=" + new String(header.value());
105                 }
106         }
107 }