@SpringBootTest(
properties = {
- "spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false ",
+ "spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.WordData",
"spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter",
static void assertExpectedResult(MultiValueMap<String, WordData> receivedMessages)
{
- MultiValueMap<String, WordData> words = new LinkedMultiValueMap<>();
- expectedMessages.forEach(keyValue -> words.add(keyValue.key, keyValue.value));
+ MultiValueMap<String, WordData> expected = new LinkedMultiValueMap<>();
+ expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value));
await("Received expected messages")
- .atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> words.forEach((user, word) ->
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> expected.forEach((user, word) ->
assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
}
static class Consumer
{
- final MultiValueMap<String, WordData> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<String, WordData> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(