"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
- "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.TestOutputWord",
- "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
{
TestData
.getInputMessages()
- .forEach(word -> kafkaTemplate.send(TOPIC_IN, word.getUser(), word));
+ .forEach(kv -> kafkaTemplate.send(TOPIC_IN, kv.key, kv.value));
await("Expected messages")
.atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
+ .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
}
received.add(word, counter);
}
- synchronized MultiValueMap<TestOutputWord, TestOutputWordCounter> getReceivedMessages()
+ synchronized void enforceAssertion(
+ java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
{
- return received;
+ assertion.accept(received);
}
}