From: Kai Moritz Date: Sun, 26 May 2024 20:47:45 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=refs%2Fheads%2Fsplitter;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 1f20e10..ce8f506 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -28,9 +28,9 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { - "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false ", + "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.WordData", "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", @@ -93,11 +93,11 @@ public class SplitterApplicationIT static void assertExpectedResult(MultiValueMap receivedMessages) { - MultiValueMap words = new LinkedMultiValueMap<>(); - expectedMessages.forEach(keyValue -> words.add(keyValue.key, keyValue.value)); + MultiValueMap expected = new LinkedMultiValueMap<>(); + expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); await("Received expected messages") - .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> words.forEach((user, word) -> + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> expected.forEach((user, word) -> assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } @@ -142,7 +142,7 @@ public class SplitterApplicationIT static class Consumer { - final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive(