X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryApplicationIT.java;h=58a12068c2ce71566889c84ea7ffae280f5cc6d7;hb=f18423d411650c6f08c9b698b92c33c42bdd670f;hp=4e44cdaa2f4d7524ad6dd1eea57a157dc9bde1e3;hpb=2b22a006cc57203406c8589687a6c729ebdbf40c;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 4e44cda..58a1206 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.query; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -7,33 +8,43 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; +import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( properties = { + "spring.main.allow-bean-definition-overriding=true", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", "logging.level.org.apache.kafka.streams=INFO", "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) +@AutoConfigureMockMvc @EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) @Slf4j public class QueryApplicationIT @@ -41,6 +52,11 @@ public class QueryApplicationIT public static final String TOPIC_TOP10 = "top10"; public static final String TOPIC_USERS = "users"; + + @Autowired + MockMvc mockMvc; + @Autowired + ObjectMapper objectMapper; @Autowired QueryStreamProcessor streamProcessor; @@ -75,24 +91,57 @@ public class QueryApplicationIT } } - @DisplayName("Await the expected state in the state-store") + @DisplayName("Await, that the expected state is visible in the state-store") @Test - public void testAwaitExpectedState() + public void testAwaitExpectedStateInStore() { - await("Expected state") + await("The expected state is visible in the state-store") .atMost(Duration.ofSeconds(5)) - .catchUncaughtExceptions() - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); + } + + @DisplayName("Await, that the expected state is queryable") + @Test + public void testAwaitExpectedStateIsQueryable() + { + await("The expected state is queryable") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); + } + + private UserRanking requestUserRankingFor(String user) + { + try + { + return objectMapper.readValue( + mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8), + UserRanking.class); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } @TestConfiguration static class Configuration { - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() { - return Stores.inMemoryKeyValueStore(STORE_NAME); + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); } } }