1 package de.juplo.kafka.wordcount.counter;
3 import org.apache.kafka.streams.KeyValue;
6 import java.util.function.BiConsumer;
8 import static org.assertj.core.api.Assertions.assertThat;
13 static void writeInputData(BiConsumer<String, Word> consumer)
17 Word.of("peter","Hallo"));
20 Word.of("klaus","Müsch"));
23 Word.of("peter","Welt"));
26 Word.of("klaus","Müsch"));
29 Word.of("klaus","s"));
32 Word.of("peter","Boäh"));
35 Word.of("peter","Welt"));
38 Word.of("peter","Boäh"));
41 Word.of("klaus","s"));
44 Word.of("peter","Boäh"));
47 Word.of("klaus","s"));
50 static void assertExpectedResult(List<KeyValue<Word, WordCount>> receivedMessages)
52 assertThat(receivedMessages).hasSize(11);
53 assertThat(receivedMessages).containsSubsequence(
54 expectedMessages[0]); // Hallo
55 assertThat(receivedMessages).containsSubsequence(
57 expectedMessages[3]); // Müsch
58 assertThat(receivedMessages).containsSubsequence(
61 assertThat(receivedMessages).containsSubsequence(
64 expectedMessages[10]); // s
65 assertThat(receivedMessages).containsSubsequence(
68 expectedMessages[9]); // Boäh
71 static KeyValue<Word,WordCount>[] expectedMessages = new KeyValue[]
74 Word.of("peter","Hallo"),
75 WordCount.of("peter","Hallo",1)),
77 Word.of("klaus","Müsch"),
78 WordCount.of("klaus","Müsch",1)),
80 Word.of("peter","Welt"),
81 WordCount.of("peter","Welt",1)),
83 Word.of("klaus","Müsch"),
84 WordCount.of("klaus","Müsch",2)),
87 WordCount.of("klaus","s",1)),
89 Word.of("peter","Boäh"),
90 WordCount.of("peter","Boäh",1)),
92 Word.of("peter","Welt"),
93 WordCount.of("peter","Welt",2)),
95 Word.of("peter","Boäh"),
96 WordCount.of("peter","Boäh",2)),
99 WordCount.of("klaus","s",2)),
101 Word.of("peter","Boäh"),
102 WordCount.of("peter","Boäh",3)),
104 Word.of("klaus","s"),
105 WordCount.of("klaus","s",3)),