@Test
void testSendMessage()
{
- TestData.writeInputData((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value));
+ TestData.injectInputMessages((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value));
- await("Expexted converted data")
+ await("Expected messages")
.atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages()));
+ .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
}
(JsonDeserializer<Word>)keySerde.deserializer(),
(JsonDeserializer<WordCounter>)valueSerde.deserializer());
- TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+ TestData.injectInputMessages((key, value) -> in.pipeInput(key, value));
MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
out
receivedMessages.add(record.key(), record.value());
});
- TestData.assertExpectedResult(receivedMessages);
+ TestData.assertExpectedMessages(receivedMessages);
}
}
class TestData
{
- static void writeInputData(BiConsumer<String, Word> consumer)
+ static void injectInputMessages(BiConsumer<String, Word> consumer)
{
Stream
.of(inputMessagesArray)
Word.of("klaus","s"),
};
- static void assertExpectedResult(MultiValueMap<Word, WordCounter> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<Word, WordCounter> receivedMessages)
{
expectedMessages.forEach(
(word, counter) ->