From 2b22a006cc57203406c8589687a6c729ebdbf40c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 17:03:59 +0200 Subject: [PATCH] query: 1.0.6 - Added `QueryApplicationIT` --- pom.xml | 3 + .../wordcount/query/QueryStreamProcessor.java | 7 +- .../wordcount/query/QueryApplicationIT.java | 98 +++++++++++++++++++ .../juplo/kafka/wordcount/query/TestData.java | 5 + 4 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java diff --git a/pom.xml b/pom.xml index d74c826..d7f3a30 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,9 @@ + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 3e205f6..c4ae4ae 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -88,6 +88,11 @@ public class QueryStreamProcessor return topology; } + ReadOnlyKeyValueStore getStore() + { + return streams.store(storeParameters); + } + public Optional getRedirect(String username) { KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer()); @@ -108,7 +113,7 @@ public class QueryStreamProcessor { return Optional - .ofNullable(streams.store(storeParameters).get(username)) + .ofNullable(getStore().get(username)) .map(json -> { try diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java new file mode 100644 index 0000000..4e44cda --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -0,0 +1,98 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", + "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, + "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) +@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) +@Slf4j +public class QueryApplicationIT +{ + public static final String TOPIC_TOP10 = "top10"; + public static final String TOPIC_USERS = "users"; + + @Autowired + QueryStreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + TestData + .getUsersMessages() + .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + TestData + .getTop10Messages() + .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + } + + private static void flush(CompletableFuture> future) + { + try + { + SendResult result = future.get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .catchUncaughtExceptions() + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + + @TestConfiguration + static class Configuration + { + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 82f7217..610bca0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -42,6 +42,11 @@ class TestData private static UserRanking userRankingOf(String json) { + if (json == null) + { + return null; + } + try { return objectMapper.readValue(json, UserRanking.class); -- 2.20.1