From 0b34b4d8eec93307ae521b423397d46fb3055827 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 27 May 2024 23:32:49 +0200 Subject: [PATCH] top10: 1.2.1 - (RED) Implemented integration-test `Top10ApplicationIT` --- pom.xml | 2 +- .../kafka/wordcount/query/TestRanking.java | 11 ++ .../wordcount/top10/Top10ApplicationIT.java | 121 ++++++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java diff --git a/pom.xml b/pom.xml index fd71ccd..b30c4ea 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.2.0 + 1.2.1 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java new file mode 100644 index 0000000..e7f8053 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.Entry; +import lombok.Data; + + +@Data +public class TestRanking +{ + private Entry[] entries; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java new file mode 100644 index 0000000..2d45f03 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -0,0 +1,121 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.query.TestRanking; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.use.type.headers=false", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, + "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) +@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT }) +@Slf4j +public class Top10ApplicationIT +{ + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; + + @Autowired + Consumer consumer; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + Stream + .of(TestData.INPUT_MESSAGES) + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @DisplayName("Await the expected output messages") + @Test + public void testAwaitExpectedMessages() + { + await("Expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) User user, + @Payload TestRanking ranking) + { + log.debug("Received message: {} -> {}", user, ranking); + received.add(user, Ranking.of(ranking.getEntries())); + } + + synchronized MultiValueMap getReceivedMessages() + { + return received; + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +} -- 2.20.1