{
TestData
.getInputMessages()
- .forEach(word -> kafkaTemplate.send(TOPIC_IN, word.getUser(), word));
+ .forEach(kv -> kafkaTemplate.send(TOPIC_IN, kv.key, kv.value));
await("Expected messages")
.atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
+ .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
}
received.add(word, counter);
}
- synchronized MultiValueMap<TestOutputWord, TestOutputWordCounter> getReceivedMessages()
+ synchronized void enforceAssertion(
+ java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
{
- return received;
+ assertion.accept(received);
}
}