1 package de.juplo.kafka.wordcount.counter;
3 import org.apache.kafka.streams.KeyValue;
7 import java.util.Properties;
8 import java.util.function.BiConsumer;
9 import java.util.stream.Collectors;
11 import static org.assertj.core.api.Assertions.assertThat;
16 static void writeInputData(BiConsumer<String, Word> consumer)
20 Word.of("peter","Hallo"));
23 Word.of("klaus","Müsch"));
26 Word.of("peter","Welt"));
29 Word.of("klaus","Müsch"));
32 Word.of("klaus","s"));
35 Word.of("peter","Boäh"));
38 Word.of("peter","Welt"));
41 Word.of("peter","Boäh"));
44 Word.of("klaus","s"));
47 Word.of("peter","Boäh"));
50 Word.of("klaus","s"));
53 static void assertExpectedResult(List<KeyValue<Word, WordCounter>> receivedMessages)
55 assertThat(receivedMessages).hasSize(11);
56 assertThat(receivedMessages).containsSubsequence(
57 expectedMessages[0]); // Hallo
58 assertThat(receivedMessages).containsSubsequence(
60 expectedMessages[3]); // Müsch
61 assertThat(receivedMessages).containsSubsequence(
64 assertThat(receivedMessages).containsSubsequence(
67 expectedMessages[10]); // s
68 assertThat(receivedMessages).containsSubsequence(
71 expectedMessages[9]); // Boäh
74 static KeyValue<Word, WordCounter>[] expectedMessages = new KeyValue[]
77 Word.of("peter","Hallo"),
78 WordCounter.of("peter","Hallo",1)),
80 Word.of("klaus","Müsch"),
81 WordCounter.of("klaus","Müsch",1)),
83 Word.of("peter","Welt"),
84 WordCounter.of("peter","Welt",1)),
86 Word.of("klaus","Müsch"),
87 WordCounter.of("klaus","Müsch",2)),
90 WordCounter.of("klaus","s",1)),
92 Word.of("peter","Boäh"),
93 WordCounter.of("peter","Boäh",1)),
95 Word.of("peter","Welt"),
96 WordCounter.of("peter","Welt",2)),
98 Word.of("peter","Boäh"),
99 WordCounter.of("peter","Boäh",2)),
101 Word.of("klaus","s"),
102 WordCounter.of("klaus","s",2)),
104 Word.of("peter","Boäh"),
105 WordCounter.of("peter","Boäh",3)),
107 Word.of("klaus","s"),
108 WordCounter.of("klaus","s",3)),
111 static Map<String, Object> convertToMap(Properties properties)
118 entry -> (String)entry.getKey(),
119 entry -> entry.getValue()