query: 1.0.6 - Added `QueryApplicationIT`
authorKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 15:03:59 +0000 (17:03 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 12 Jun 2024 20:24:53 +0000 (22:24 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/query/TestData.java

diff --git a/pom.xml b/pom.xml
index d74c826..d7f3a30 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,9 @@
 
        <build>
                <plugins>
+                       <plugin>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                       </plugin>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
index 3e205f6..c4ae4ae 100644 (file)
@@ -88,6 +88,11 @@ public class QueryStreamProcessor
                return topology;
        }
 
+       ReadOnlyKeyValueStore<String, String> getStore()
+       {
+               return streams.store(storeParameters);
+       }
+
        public Optional<URI> getRedirect(String username)
        {
                KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
@@ -108,7 +113,7 @@ public class QueryStreamProcessor
        {
                return
                                Optional
-                                               .ofNullable(streams.store(storeParameters).get(username))
+                                               .ofNullable(getStore().get(username))
                                                .map(json ->
                                                {
                                                        try
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
new file mode 100644 (file)
index 0000000..4e44cda
--- /dev/null
@@ -0,0 +1,98 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
+                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+                               "logging.level.root=WARN",
+                               "logging.level.de.juplo=DEBUG",
+                               "logging.level.org.apache.kafka.clients=INFO",
+                               "logging.level.org.apache.kafka.streams=INFO",
+                               "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
+                               "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@Slf4j
+public class QueryApplicationIT
+{
+       public static final String TOPIC_TOP10 = "top10";
+       public static final String TOPIC_USERS = "users";
+
+       @Autowired
+       QueryStreamProcessor streamProcessor;
+
+
+       @BeforeAll
+       public static void testSendMessage(
+                       @Autowired KafkaTemplate<String, Object> kafkaTemplate)
+       {
+               TestData
+                               .getUsersMessages()
+                               .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+               TestData
+                               .getTop10Messages()
+                               .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+       }
+
+       private static void flush(CompletableFuture<SendResult<String, Object>> future)
+       {
+               try
+               {
+                       SendResult<String, Object> result = future.get();
+                       log.info(
+                                       "Sent: {}={}, partition={}, offset={}",
+                                       result.getProducerRecord().key(),
+                                       result.getProducerRecord().value(),
+                                       result.getRecordMetadata().partition(),
+                                       result.getRecordMetadata().offset());
+               }
+               catch (Exception e)
+               {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expected state")
+                               .atMost(Duration.ofSeconds(5))
+                               .catchUncaughtExceptions()
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+               }
+       }
+}
index 82f7217..610bca0 100644 (file)
@@ -42,6 +42,11 @@ class TestData
 
        private static UserRanking userRankingOf(String json)
        {
+               if (json == null)
+               {
+                       return null;
+               }
+
                try
                {
                        return objectMapper.readValue(json, UserRanking.class);