import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
+import java.util.function.Consumer;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- private static final TestInputWord[] INPUT_MESSAGES = new TestInputWord[]
+ static final String PETER = "peter";
+ static final String KLAUS = "klaus";
+
+ private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
{
- TestInputWord.of("peter","Hallo"),
- TestInputWord.of("klaus","Müsch"),
- TestInputWord.of("peter","Welt"),
- TestInputWord.of("klaus","Müsch"),
- TestInputWord.of("klaus","s"),
- TestInputWord.of("peter","Boäh"),
- TestInputWord.of("peter","Welt"),
- TestInputWord.of("peter","Boäh"),
- TestInputWord.of("klaus","s"),
- TestInputWord.of("peter","Boäh"),
- TestInputWord.of("klaus","s"),
+ new KeyValue<>(
+ PETER,
+ TestInputWord.of(PETER, "Hallo")),
+ new KeyValue<>(
+ KLAUS,
+ TestInputWord.of(KLAUS, "Müsch")),
+ new KeyValue<>(
+ PETER,
+ TestInputWord.of(PETER, "Welt")),
+ new KeyValue<>(
+ KLAUS,
+ TestInputWord.of(KLAUS, "Müsch")),
+ new KeyValue<>(
+ KLAUS,
+ TestInputWord.of(KLAUS, "s")),
+ new KeyValue<>(
+ PETER,
+ TestInputWord.of(PETER, "Boäh")),
+ new KeyValue<>(
+ PETER,
+ TestInputWord.of(PETER, "Welt")),
+ new KeyValue<>(
+ PETER,
+ TestInputWord.of(PETER, "Boäh")),
+ new KeyValue<>(
+ KLAUS,
+ TestInputWord.of(KLAUS, "s")),
+ new KeyValue<>(
+ PETER,
+ TestInputWord.of(PETER, "Boäh")),
+ new KeyValue<>(
+ KLAUS,
+ TestInputWord.of(KLAUS, "s")),
};
- static Stream<TestInputWord> getInputMessages()
+ static Stream<KeyValue<String, TestInputWord>> getInputMessages()
{
return Stream.of(TestData.INPUT_MESSAGES);
}
+ static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedMessagesAssertion()
+ {
+ return receivedMessages -> assertExpectedMessages(receivedMessages);
+ }
+
static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
{
expectedMessages().forEach(
private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- TestOutputWord.of("peter","Hallo"),
- TestOutputWordCounter.of("peter","Hallo",1)),
+ TestOutputWord.of(PETER, "Hallo"),
+ TestOutputWordCounter.of(PETER, "Hallo",1)),
KeyValue.pair(
- TestOutputWord.of("klaus","Müsch"),
- TestOutputWordCounter.of("klaus","Müsch",1)),
+ TestOutputWord.of(KLAUS, "Müsch"),
+ TestOutputWordCounter.of(KLAUS, "Müsch",1)),
KeyValue.pair(
- TestOutputWord.of("peter","Welt"),
- TestOutputWordCounter.of("peter","Welt",1)),
+ TestOutputWord.of(PETER, "Welt"),
+ TestOutputWordCounter.of(PETER, "Welt",1)),
KeyValue.pair(
- TestOutputWord.of("klaus","Müsch"),
- TestOutputWordCounter.of("klaus","Müsch",2)),
+ TestOutputWord.of(KLAUS, "Müsch"),
+ TestOutputWordCounter.of(KLAUS, "Müsch",2)),
KeyValue.pair(
- TestOutputWord.of("klaus","s"),
- TestOutputWordCounter.of("klaus","s",1)),
+ TestOutputWord.of(KLAUS, "s"),
+ TestOutputWordCounter.of(KLAUS, "s",1)),
KeyValue.pair(
- TestOutputWord.of("peter","Boäh"),
- TestOutputWordCounter.of("peter","Boäh",1)),
+ TestOutputWord.of(PETER, "Boäh"),
+ TestOutputWordCounter.of(PETER, "Boäh",1)),
KeyValue.pair(
- TestOutputWord.of("peter","Welt"),
- TestOutputWordCounter.of("peter","Welt",2)),
+ TestOutputWord.of(PETER, "Welt"),
+ TestOutputWordCounter.of(PETER, "Welt",2)),
KeyValue.pair(
- TestOutputWord.of("peter","Boäh"),
- TestOutputWordCounter.of("peter","Boäh",2)),
+ TestOutputWord.of(PETER, "Boäh"),
+ TestOutputWordCounter.of(PETER, "Boäh",2)),
KeyValue.pair(
- TestOutputWord.of("klaus","s"),
- TestOutputWordCounter.of("klaus","s",2)),
+ TestOutputWord.of(KLAUS, "s"),
+ TestOutputWordCounter.of(KLAUS, "s",2)),
KeyValue.pair(
- TestOutputWord.of("peter","Boäh"),
- TestOutputWordCounter.of("peter","Boäh",3)),
+ TestOutputWord.of(PETER, "Boäh"),
+ TestOutputWordCounter.of(PETER, "Boäh",3)),
KeyValue.pair(
- TestOutputWord.of("klaus","s"),
- TestOutputWordCounter.of("klaus","s",3)),
+ TestOutputWord.of(KLAUS, "s"),
+ TestOutputWordCounter.of(KLAUS, "s",3)),
};
static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
return expectedMessages;
}
-
- static String parseHeader(Headers headers, String key)
- {
- Header header = headers.lastHeader(key);
- if (header == null)
- {
- return key + "=null";
- }
- else
- {
- return key + "=" + new String(header.value());
- }
- }
}